-
Notifications
You must be signed in to change notification settings - Fork 4
Cache: Add working directory parameter #446
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0f59f57
6f9b59e
e19d4b3
75f9c18
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,7 +4,7 @@ | |
import subprocess | ||
import sys | ||
from concurrent.futures import Future | ||
from typing import Tuple | ||
from typing import Optional, Tuple | ||
|
||
from executorlib.standalone.command import get_command_path | ||
from executorlib.standalone.hdf import dump, get_output | ||
|
@@ -48,14 +48,17 @@ def done(self) -> bool: | |
|
||
|
||
def execute_in_subprocess( | ||
command: list, task_dependent_lst: list = [] | ||
command: list, | ||
task_dependent_lst: list = [], | ||
cwd: Optional[str] = None, | ||
Comment on lines
+51
to
+53
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix mutable default argument Using mutable default arguments in Python can lead to unexpected behavior as the list is created once when the function is defined, not each time it's called. Replace with: def execute_in_subprocess(
command: list,
- task_dependent_lst: list = [],
+ task_dependent_lst: list = None,
cwd: Optional[str] = None,
) -> subprocess.Popen: And add at the beginning of the function: if task_dependent_lst is None:
task_dependent_lst = [] 🧰 Tools🪛 Ruff
|
||
) -> subprocess.Popen: | ||
""" | ||
Execute a command in a subprocess. | ||
|
||
Args: | ||
command (list): The command to be executed. | ||
task_dependent_lst (list, optional): A list of subprocesses that the current subprocess depends on. Defaults to []. | ||
task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to []. | ||
cwd (str/None): current working directory where the parallel python task is executed | ||
|
||
Returns: | ||
subprocess.Popen: The subprocess object. | ||
|
@@ -65,14 +68,15 @@ def execute_in_subprocess( | |
task_dependent_lst = [ | ||
task for task in task_dependent_lst if task.poll() is None | ||
] | ||
return subprocess.Popen(command, universal_newlines=True) | ||
return subprocess.Popen(command, universal_newlines=True, cwd=cwd) | ||
|
||
|
||
def execute_tasks_h5( | ||
future_queue: queue.Queue, | ||
cache_directory: str, | ||
cores_per_worker: int, | ||
execute_function: callable, | ||
cwd: Optional[str], | ||
) -> None: | ||
""" | ||
Execute tasks stored in a queue using HDF5 files. | ||
|
@@ -82,6 +86,7 @@ def execute_tasks_h5( | |
cache_directory (str): The directory to store the HDF5 files. | ||
cores_per_worker (int): The number of cores per worker. | ||
execute_function (callable): The function to execute the tasks. | ||
cwd (str/None): current working directory where the parallel python task is executed | ||
|
||
Returns: | ||
None | ||
|
@@ -123,6 +128,7 @@ def execute_tasks_h5( | |
task_dependent_lst=[ | ||
process_dict[k] for k in future_wait_key_lst | ||
], | ||
cwd=cwd, | ||
) | ||
file_name_dict[task_key] = os.path.join( | ||
cache_directory, task_key + ".h5out" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,10 @@ def my_funct(a, b): | |
return a + b | ||
|
||
|
||
def list_files_in_working_directory(): | ||
return os.listdir(os.getcwd()) | ||
|
||
|
||
@unittest.skipIf( | ||
skip_h5io_test, "h5io is not installed, so the h5io tests are skipped." | ||
) | ||
|
@@ -38,6 +42,12 @@ def test_executor_dependence_mixed(self): | |
self.assertEqual(fs2.result(), 4) | ||
self.assertTrue(fs2.done()) | ||
|
||
def test_executor_working_directory(self): | ||
cwd = os.path.join(os.path.dirname(__file__), "executables") | ||
with FileExecutor(cwd=cwd) as exe: | ||
fs1 = exe.submit(list_files_in_working_directory) | ||
self.assertEqual(fs1.result(), os.listdir(cwd)) | ||
|
||
Comment on lines
+45
to
+50
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Codebase verification 🛠️ Refactor suggestion Add test cases for error scenarios with cwd parameter The review comment is valid. The implementation accepts a
The suggested test cases in the review comment appropriately cover:
🔗 Analysis chainAdd test cases for error scenarios. The current test only covers the happy path. Consider adding test cases for:
Here's a suggested additional test: def test_executor_working_directory_errors(self):
# Test non-existent directory
with self.assertRaises(ValueError):
with FileExecutor(cwd="/nonexistent/path"):
pass
# Test invalid directory path
with self.assertRaises(ValueError):
with FileExecutor(cwd=""):
pass
# Test directory without permissions (Unix-like systems only)
if os.name != 'nt':
no_access_dir = os.path.join(self.test_dir, "no_access")
os.makedirs(no_access_dir, mode=0o000)
try:
with self.assertRaises(PermissionError):
with FileExecutor(cwd=no_access_dir):
pass
finally:
os.chmod(no_access_dir, 0o755) # Restore permissions for cleanup
os.rmdir(no_access_dir) 🏁 Scripts executedThe following scripts were executed for the analysis: Script: #!/bin/bash
# Check if there are any existing error handling tests for cwd parameter
rg -l "test.*working.*directory.*error" tests/
rg -l "test.*invalid.*directory" tests/
Length of output: 90 Script: #!/bin/bash
# Let's check for any existing error handling tests for FileExecutor initialization
rg -A 10 "FileExecutor.*cwd" tests/
Length of output: 891 Script: #!/bin/bash
# Let's check the FileExecutor implementation to understand error handling
rg -A 10 "class FileExecutor" .
Length of output: 678 Script: #!/bin/bash
# Let's check the implementation details of FileExecutor to see if there's any validation
rg -B 5 -A 15 "def __init__" ./executorlib/cache/executor.py
Length of output: 929 |
||
def test_executor_function(self): | ||
fs1 = Future() | ||
q = Queue() | ||
|
@@ -51,13 +61,15 @@ def test_executor_function(self): | |
"cache_directory": cache_dir, | ||
"execute_function": execute_in_subprocess, | ||
"cores_per_worker": 1, | ||
"cwd": None, | ||
}, | ||
) | ||
process.start() | ||
self.assertFalse(fs1.done()) | ||
self.assertEqual(fs1.result(), 3) | ||
self.assertTrue(fs1.done()) | ||
q.put({"shutdown": True, "wait": True}) | ||
process.join() | ||
|
||
def test_executor_function_dependence_kwargs(self): | ||
fs1 = Future() | ||
|
@@ -74,13 +86,15 @@ def test_executor_function_dependence_kwargs(self): | |
"cache_directory": cache_dir, | ||
"execute_function": execute_in_subprocess, | ||
"cores_per_worker": 1, | ||
"cwd": None, | ||
}, | ||
) | ||
process.start() | ||
self.assertFalse(fs2.done()) | ||
self.assertEqual(fs2.result(), 4) | ||
self.assertTrue(fs2.done()) | ||
q.put({"shutdown": True, "wait": True}) | ||
process.join() | ||
|
||
def test_executor_function_dependence_args(self): | ||
fs1 = Future() | ||
|
@@ -97,13 +111,15 @@ def test_executor_function_dependence_args(self): | |
"cache_directory": cache_dir, | ||
"execute_function": execute_in_subprocess, | ||
"cores_per_worker": 1, | ||
"cwd": None, | ||
}, | ||
) | ||
process.start() | ||
self.assertFalse(fs2.done()) | ||
self.assertEqual(fs2.result(), 5) | ||
self.assertTrue(fs2.done()) | ||
q.put({"shutdown": True, "wait": True}) | ||
process.join() | ||
|
||
def tearDown(self): | ||
if os.path.exists("cache"): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider adding working directory validation
While the implementation is correct, it might be helpful to validate the working directory exists before passing it to the execution context to fail fast with a clear error message.
Consider adding validation in
__init__
: