-
Notifications
You must be signed in to change notification settings - Fork 4
Cache: Implement resource dict for submit() #451
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe changes in this pull request involve updates to the Changes
Possibly related PRs
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Outside diff range and nitpick comments (8)
executorlib/standalone/cache/spawner.py (2)
9-9
: Improve type hints and docstring accuracy.The docstring example contains incorrect Python syntax for the dictionary key, and the documentation could be more specific about supported resource types.
Consider these improvements:
- resource_dict: Optional[dict] = None, + resource_dict: Optional[dict[str, Optional[str]]] = None,- cwd: None, + "cwd": None,Also, consider expanding the docstring to list all supported resource types that might be added in the future (e.g., threads, GPUs) to maintain documentation completeness.
Also applies to: 17-20
Line range hint
9-32
: Consider introducing a ResourceManager class for better extensibility.As this PR implements resource management functionality, consider creating a dedicated
ResourceManager
class to handle resource validation, defaults, and future extensions (threads, GPUs, etc.). This would provide a more maintainable and extensible solution.Example structure:
class ExecutionResources: def __init__(self, cwd: Optional[str] = None, **kwargs): self.cwd = cwd self.additional_resources = kwargs @classmethod def from_dict(cls, resource_dict: Optional[dict] = None) -> 'ExecutionResources': if resource_dict is None: return cls() return cls(**resource_dict) def validate(self) -> None: if not (self.cwd is None or isinstance(self.cwd, str)): raise TypeError("cwd must be None or a string")This would make it easier to add new resource types and their validation in the future.
🧰 Tools
🪛 Ruff
8-8: Do not use mutable data structures for argument defaults
Replace with
None
; initialize within function(B006)
executorlib/cache/executor.py (2)
18-21
: Consider reordering parameters to improve readability.The parameter order could be improved by:
- Grouping related resource parameters together (cores_per_worker, threads_per_core, gpus_per_worker)
- Moving callback parameters (execute_function, terminate_function) to the end
Consider this order:
def __init__( self, cache_directory: str = "cache", cores_per_worker: int = 1, threads_per_core: int = 1, gpus_per_worker: int = 0, cwd: Optional[str] = None, - execute_function: callable = execute_in_subprocess, - terminate_function: Optional[callable] = None, + execute_function: callable = execute_in_subprocess, + terminate_function: Optional[callable] = None, ):
32-34
: Improve docstring formatting and completeness.The docstring has inconsistent formatting and could be more detailed:
- Type hints are missing in parameter descriptions
- Formatting differs from existing parameters
- The cwd description is incomplete
Consider updating the docstring:
""" Initialize the FileExecutor. Args: cache_directory (str, optional): The directory to store cache files. Defaults to "cache". cores_per_worker (int, optional): The number of CPU cores per worker. Defaults to 1. - threads_per_core (int): number of OpenMP threads to be used for each function call - gpus_per_worker (int): number of GPUs per worker - defaults to 0 - cwd (str/None): current working directory where the parallel python task is executed + threads_per_core (int, optional): Number of OpenMP threads to be used for each function call. Defaults to 1. + gpus_per_worker (int, optional): Number of GPUs to allocate per worker. Defaults to 0. + cwd (Optional[str]): Current working directory for executing parallel Python tasks. Defaults to None. execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess. terminate_function (callable, optional): The function to terminate the tasks. """tests/test_cache_executor_serial.py (2)
76-76
: Consider adding test coverage for new resource parameters.While the change correctly implements the resource_dict, the test could be enhanced to verify the new parameters mentioned in the changes (
threads_per_core
andgpus_per_worker
).Would you like me to help generate additional test cases for these parameters?
Line range hint
1-176
: Consider restructuring tests for comprehensive resource management coverage.While the current changes correctly implement the resource_dict, the test suite could benefit from a dedicated test class focusing on resource management scenarios. This would help ensure the reliability of the new features like
threads_per_core
andgpus_per_worker
.Suggested test scenarios:
- Resource allocation and constraints
- Multi-threading behavior
- GPU worker distribution
- Resource conflict resolution
- Error handling for invalid configurations
This separation of concerns would make the test suite more maintainable and provide better coverage for the new functionality.
Would you like me to help design the structure for these additional test cases?
executorlib/cache/shared.py (2)
66-67
: Enhance parameter documentation.While the documentation follows the existing format, consider adding more details:
- For
threads_per_core
: Specify if this is specifically for OpenMP threading, and any limitations or requirements.- For
gpus_per_worker
: Add information about GPU requirements, allocation strategy, and what happens if GPUs are unavailable.- threads_per_core (int): number of OpenMP threads to be used for each function call - gpus_per_worker (int): number of GPUs per worker - defaults to 0 + threads_per_core (int): Number of OpenMP threads per CPU core. Each function call + will use (cores_per_worker * threads_per_core) total threads. Requires OpenMP support. + gpus_per_worker (int): Number of GPUs to allocate per worker process. If GPUs are + unavailable, execution falls back to CPU-only mode. Defaults to 0.
Line range hint
54-127
: Document resource management strategy.The introduction of granular resource control (threads, GPUs) is a significant architectural change. Consider:
- Adding a module-level docstring explaining the resource management strategy
- Creating a configuration guide for optimal resource allocation
- Documenting the relationship between
cores_per_worker
,threads_per_core
, andgpus_per_worker
This will help users understand how to effectively configure resources for their workloads.
🧰 Tools
🪛 Ruff
115-115: Use
key not in dict
instead ofkey not in dict.keys()
Remove
.keys()
(SIM118)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (4)
- executorlib/cache/executor.py (3 hunks)
- executorlib/cache/shared.py (4 hunks)
- executorlib/standalone/cache/spawner.py (2 hunks)
- tests/test_cache_executor_serial.py (3 hunks)
🔇 Additional comments (5)
executorlib/cache/executor.py (2)
49-50
: LGTM! Resource parameters are properly passed to execute_tasks_h5.The implementation correctly forwards the new resource configuration parameters to the execution layer.
18-19
: Verify the resource parameter usage.Let's ensure these new resource parameters are properly handled in the execution layer.
✅ Verification successful
Based on the search results, I can now provide a final response about the resource parameter usage:
Resource parameters are properly integrated across the codebase
The parameters
threads_per_core
andgpus_per_worker
are well-integrated throughout the codebase:
- They are properly propagated through the executor chain from high-level interfaces to low-level implementations
- Input validation is implemented via
check_threads_per_core()
andcheck_gpus_per_worker()
- The parameters are correctly handled in resource dictionary construction and task execution
- Documentation and examples demonstrate proper usage patterns
- Test coverage exists for both parameters in various scenarios
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check how the new resource parameters are used in execute_tasks_h5 and related functions # Check for threads_per_core usage echo "Checking threads_per_core usage:" rg -A 5 "threads_per_core" --type python # Check for gpus_per_worker usage echo "Checking gpus_per_worker usage:" rg -A 5 "gpus_per_worker" --type python # Check for resource dictionary construction echo "Checking resource_dict usage:" rg -A 5 "resource_dict" --type pythonLength of output: 428
Script:
#!/bin/bash # Description: Check how the new resource parameters are used in execute_tasks_h5 and related functions # Check for threads_per_core usage echo "Checking threads_per_core usage:" rg -A 5 "threads_per_core" # Check for gpus_per_worker usage echo "Checking gpus_per_worker usage:" rg -A 5 "gpus_per_worker" # Check for resource dictionary construction echo "Checking resource_dict usage:" rg -A 5 "resource_dict" # Check the executor class implementation echo "Checking executor class implementation:" ast-grep --pattern $'class Executor { $$$ }'Length of output: 70727
tests/test_cache_executor_serial.py (1)
118-118
: LGTM!The resource_dict implementation is consistent with the previous test method.
executorlib/cache/shared.py (2)
54-56
: LGTM: Well-structured parameter additions.The new parameters are properly typed and have sensible defaults. The parameter ordering is logical, grouping compute resources together.
113-113
: Verify resource dictionary handling in downstream functions.The resource dictionary is now passed to both
serialize_funct_h5
andexecute_function
. Let's verify that these functions properly handle the resource parameters.Also applies to: 127-127
✅ Verification successful
Let me generate additional verification scripts to check the implementation of
serialize_funct_h5
which wasn't found in the previous results.
Based on the verification results, I can now generate the final response:
Resource dictionary handling is properly implemented across the execution chain
The resource dictionary is correctly:
- Accepted as a parameter in
serialize_funct_h5
function- Handled in the interactive executor's submit method with proper validation (
check_resource_dict
)- Propagated through task execution in
executorlib/interactive/shared.py
- Used in test cases with both empty and specific resource configurations
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Check how serialize_funct_h5 and execute_function handle resource_dict echo "Checking serialize_funct_h5 implementation..." ast-grep --pattern 'def serialize_funct_h5($$$resource_dict$$$)' -A 10 echo "Checking execute_function implementations..." rg -l "def.*execute.*function" | xargs rg "resource_dict" -A 5Length of output: 6921
Script:
#!/bin/bash # Search for serialize_funct_h5 implementation echo "Searching for serialize_funct_h5 implementation..." rg "def serialize_funct_h5" -A 10 # Check for resource_dict usage in cache-related files echo "Checking resource_dict usage in cache files..." fd cache --type f | xargs rg "resource_dict" -A 5Length of output: 4353
if resource_dict is None: | ||
resource_dict = {"cwd": None} | ||
return subprocess.Popen(command, universal_newlines=True, cwd=resource_dict["cwd"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add validation for resource dictionary access.
The current implementation might raise KeyError if the provided dictionary doesn't contain the 'cwd' key, and there's no validation of the value type.
Consider adding validation:
if resource_dict is None:
resource_dict = {"cwd": None}
+ if "cwd" not in resource_dict:
+ resource_dict["cwd"] = None
+ elif not (resource_dict["cwd"] is None or isinstance(resource_dict["cwd"], str)):
+ raise TypeError("cwd must be None or a string")
return subprocess.Popen(command, universal_newlines=True, cwd=resource_dict["cwd"])
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if resource_dict is None: | |
resource_dict = {"cwd": None} | |
return subprocess.Popen(command, universal_newlines=True, cwd=resource_dict["cwd"]) | |
if resource_dict is None: | |
resource_dict = {"cwd": None} | |
if "cwd" not in resource_dict: | |
resource_dict["cwd"] = None | |
elif not (resource_dict["cwd"] is None or isinstance(resource_dict["cwd"], str)): | |
raise TypeError("cwd must be None or a string") | |
return subprocess.Popen(command, universal_newlines=True, cwd=resource_dict["cwd"]) |
tests/test_cache_executor_serial.py
Outdated
@@ -157,7 +157,7 @@ def test_executor_function_dependence_args(self): | |||
"cache_directory": cache_dir, | |||
"execute_function": execute_in_subprocess, | |||
"cores_per_worker": 1, | |||
"cwd": None, | |||
"resource_dict": {"cwd": None}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Review comment is accurate - resource management test coverage needs improvement
The codebase has limited resource management testing:
- Most tests only use empty resource dictionaries
{}
or basic{"cwd": None}
test_shared_input_check.py
only validates input types/formatstest_dependencies_executor.py
has basic core allocation tests- No comprehensive tests for:
- Thread and GPU resource allocation behaviors
- Resource constraint handling
- Error cases with invalid configurations
- Resource cleanup and release
- Resource contention scenarios
🔗 Analysis chain
LGTM! Consider adding resource management integration tests.
The resource_dict implementation is consistent. However, since this PR introduces significant resource management changes, consider adding integration tests that verify:
- Resource allocation behavior with different thread and GPU configurations
- Resource constraints and their impact on task execution
- Error handling for invalid resource configurations
Let's check if there are any existing resource management tests:
Let's examine the actual test coverage of resource management in these files.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for existing resource management tests
rg -l "threads_per_core|gpus_per_worker|resource_dict" "tests/"
Length of output: 310
Script:
#!/bin/bash
# Check resource management test coverage
rg -A 5 "resource_dict|threads_per_core|gpus_per_worker" "tests/" --type py
Length of output: 13295
executorlib/cache/shared.py
Outdated
resource_dict = task_dict["resource_dict"] | ||
if "cores" not in resource_dict: | ||
resource_dict["cores"] = cores_per_worker | ||
if "gpus_per_core" not in resource_dict: | ||
resource_dict["gpus_per_core"] = int(gpus_per_worker / cores_per_worker) | ||
if "threads_per_core" not in resource_dict: | ||
resource_dict["threads_per_core"] = threads_per_core | ||
if "cwd" not in resource_dict: | ||
resource_dict["cwd"] = cwd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add input validation and handle edge cases in resource allocation.
The current resource dictionary construction has several potential issues:
- Integer division in
gpus_per_worker / cores_per_worker
could result in zero GPUs per core when the ratio is non-zero but < 1 - No validation of input values
- No handling of edge cases
Consider adding these safety checks:
resource_dict = task_dict["resource_dict"]
+if cores_per_worker <= 0:
+ raise ValueError("cores_per_worker must be positive")
+if gpus_per_worker < 0:
+ raise ValueError("gpus_per_worker cannot be negative")
+if threads_per_core <= 0:
+ raise ValueError("threads_per_core must be positive")
if "cores" not in resource_dict:
resource_dict["cores"] = cores_per_worker
if "gpus_per_core" not in resource_dict:
- resource_dict["gpus_per_core"] = int(gpus_per_worker / cores_per_worker)
+ # Use float division and round up to ensure fair GPU distribution
+ resource_dict["gpus_per_core"] = max(1, int((gpus_per_worker + cores_per_worker - 1) // cores_per_worker)) if gpus_per_worker > 0 else 0
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
resource_dict = task_dict["resource_dict"] | |
if "cores" not in resource_dict: | |
resource_dict["cores"] = cores_per_worker | |
if "gpus_per_core" not in resource_dict: | |
resource_dict["gpus_per_core"] = int(gpus_per_worker / cores_per_worker) | |
if "threads_per_core" not in resource_dict: | |
resource_dict["threads_per_core"] = threads_per_core | |
if "cwd" not in resource_dict: | |
resource_dict["cwd"] = cwd | |
resource_dict = task_dict["resource_dict"] | |
if cores_per_worker <= 0: | |
raise ValueError("cores_per_worker must be positive") | |
if gpus_per_worker < 0: | |
raise ValueError("gpus_per_worker cannot be negative") | |
if threads_per_core <= 0: | |
raise ValueError("threads_per_core must be positive") | |
if "cores" not in resource_dict: | |
resource_dict["cores"] = cores_per_worker | |
if "gpus_per_core" not in resource_dict: | |
# Use float division and round up to ensure fair GPU distribution | |
resource_dict["gpus_per_core"] = max(1, int((gpus_per_worker + cores_per_worker - 1) // cores_per_worker)) if gpus_per_worker > 0 else 0 | |
if "threads_per_core" not in resource_dict: | |
resource_dict["threads_per_core"] = threads_per_core | |
if "cwd" not in resource_dict: | |
resource_dict["cwd"] = cwd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (1)
executorlib/standalone/cache/spawner.py (1)
9-9
: Enhance docstring and type hints for better clarity.While the change to use
resource_dict
improves extensibility, the documentation could be more comprehensive.Consider these improvements:
- resource_dict: Optional[dict] = None, + resource_dict: Optional[dict[str, Optional[str]]] = None,resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function. Example resource dictionary: { cwd: None, } + Keys: + cwd (Optional[str]): Working directory for the subprocess. + If None, uses the current working directory.Also applies to: 17-20
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (4)
- executorlib/cache/executor.py (2 hunks)
- executorlib/cache/shared.py (3 hunks)
- executorlib/standalone/cache/spawner.py (2 hunks)
- tests/test_cache_executor_serial.py (0 hunks)
💤 Files with no reviewable changes (1)
- tests/test_cache_executor_serial.py
🚧 Files skipped from review as they are similar to previous changes (2)
- executorlib/cache/executor.py
- executorlib/cache/shared.py
🧰 Additional context used
🪛 Ruff
executorlib/standalone/cache/spawner.py
30-33: Combine
if
branches using logicalor
operatorCombine
if
branches(SIM114)
if resource_dict is None: | ||
resource_dict = {"cwd": None} | ||
elif len(resource_dict) == 0: | ||
resource_dict = {"cwd": None} | ||
return subprocess.Popen(command, universal_newlines=True, cwd=resource_dict["cwd"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Simplify conditions and future-proof resource handling.
The current implementation can be simplified and made more robust.
Consider these improvements:
- if resource_dict is None:
- resource_dict = {"cwd": None}
- elif len(resource_dict) == 0:
- resource_dict = {"cwd": None}
+ if resource_dict is None or len(resource_dict) == 0:
+ resource_dict = {"cwd": None}
+ # Create a copy to avoid modifying the input dictionary
+ resource_dict = resource_dict.copy()
Additionally, consider defining a set of supported resource keys as constants to make the code more maintainable as new resource types are added in the future:
SUPPORTED_RESOURCES = {"cwd"}
🧰 Tools
🪛 Ruff
30-33: Combine
if
branches using logicalor
operatorCombine
if
branches(SIM114)
Summary by CodeRabbit
New Features
Bug Fixes
cwd
parameter across various functions to ensure consistency and backward compatibility in resource management.Tests