Skip to content

Cache: Implement resource dict for submit() #451

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 27, 2024
Merged

Conversation

jan-janssen
Copy link
Member

@jan-janssen jan-janssen commented Oct 27, 2024

Summary by CodeRabbit

  • New Features

    • Enhanced configurability of task execution with updated parameters for specifying execution resources.
    • Improved resource management for task execution through the introduction of a centralized resource dictionary.
  • Bug Fixes

    • Adjusted handling of the cwd parameter across various functions to ensure consistency and backward compatibility in resource management.
  • Tests

    • Updated test cases to reflect changes in parameter handling, ensuring continued validation of functionality and resource management.

Copy link
Contributor

coderabbitai bot commented Oct 27, 2024

Walkthrough

The changes in this pull request involve updates to the FileExecutor class and related functions to enhance resource management during task execution. The execute_function parameter has been repositioned in the FileExecutor constructor, which now includes a conditional assignment for terminate_function. The execute_tasks_h5 function has been modified to make cores_per_worker optional and to utilize a new resource_dict for managing execution resources. Additionally, the execute_in_subprocess function has been updated to replace cwd with resource_dict, improving configurability while maintaining backward compatibility.

Changes

File Change Summary
executorlib/cache/executor.py Updated FileExecutor constructor to reposition execute_function and include conditional assignment for terminate_function. Adjusted docstring for clarity.
executorlib/cache/shared.py Modified execute_tasks_h5 function to make cores_per_worker optional (default 1) and cwd default to None. Introduced resource_dict for resource management.
executorlib/standalone/cache/spawner.py Changed execute_in_subprocess function to replace cwd with resource_dict. Updated docstring to reflect this change and adjusted handling of cwd within the function.
tests/test_cache_executor_serial.py Altered test methods to encapsulate cwd within a resource_dict as {"cwd": None} for RaisingThread instantiation. Maintained overall test structure and logic.

Possibly related PRs

🐰 In the fields where I hop and play,
New threads and GPUs come into play.
With a resource dict, oh what a delight,
Task execution now feels just right!
So let's celebrate with a joyful cheer,
For flexible coding is finally here! 🎉


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Outside diff range and nitpick comments (8)
executorlib/standalone/cache/spawner.py (2)

9-9: Improve type hints and docstring accuracy.

The docstring example contains incorrect Python syntax for the dictionary key, and the documentation could be more specific about supported resource types.

Consider these improvements:

-    resource_dict: Optional[dict] = None,
+    resource_dict: Optional[dict[str, Optional[str]]] = None,
-                                  cwd: None,
+                                  "cwd": None,

Also, consider expanding the docstring to list all supported resource types that might be added in the future (e.g., threads, GPUs) to maintain documentation completeness.

Also applies to: 17-20


Line range hint 9-32: Consider introducing a ResourceManager class for better extensibility.

As this PR implements resource management functionality, consider creating a dedicated ResourceManager class to handle resource validation, defaults, and future extensions (threads, GPUs, etc.). This would provide a more maintainable and extensible solution.

Example structure:

class ExecutionResources:
    def __init__(self, cwd: Optional[str] = None, **kwargs):
        self.cwd = cwd
        self.additional_resources = kwargs

    @classmethod
    def from_dict(cls, resource_dict: Optional[dict] = None) -> 'ExecutionResources':
        if resource_dict is None:
            return cls()
        return cls(**resource_dict)

    def validate(self) -> None:
        if not (self.cwd is None or isinstance(self.cwd, str)):
            raise TypeError("cwd must be None or a string")

This would make it easier to add new resource types and their validation in the future.

🧰 Tools
🪛 Ruff

8-8: Do not use mutable data structures for argument defaults

Replace with None; initialize within function

(B006)

executorlib/cache/executor.py (2)

18-21: Consider reordering parameters to improve readability.

The parameter order could be improved by:

  1. Grouping related resource parameters together (cores_per_worker, threads_per_core, gpus_per_worker)
  2. Moving callback parameters (execute_function, terminate_function) to the end

Consider this order:

def __init__(
    self,
    cache_directory: str = "cache",
    cores_per_worker: int = 1,
    threads_per_core: int = 1,
    gpus_per_worker: int = 0,
    cwd: Optional[str] = None,
-   execute_function: callable = execute_in_subprocess,
-   terminate_function: Optional[callable] = None,
+   execute_function: callable = execute_in_subprocess,
+   terminate_function: Optional[callable] = None,
):

32-34: Improve docstring formatting and completeness.

The docstring has inconsistent formatting and could be more detailed:

  1. Type hints are missing in parameter descriptions
  2. Formatting differs from existing parameters
  3. The cwd description is incomplete

Consider updating the docstring:

"""
Initialize the FileExecutor.

Args:
    cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
    cores_per_worker (int, optional): The number of CPU cores per worker. Defaults to 1.
-   threads_per_core (int): number of OpenMP threads to be used for each function call
-   gpus_per_worker (int): number of GPUs per worker - defaults to 0
-   cwd (str/None): current working directory where the parallel python task is executed
+   threads_per_core (int, optional): Number of OpenMP threads to be used for each function call. Defaults to 1.
+   gpus_per_worker (int, optional): Number of GPUs to allocate per worker. Defaults to 0.
+   cwd (Optional[str]): Current working directory for executing parallel Python tasks. Defaults to None.
    execute_function (callable, optional): The function to execute tasks. Defaults to execute_in_subprocess.
    terminate_function (callable, optional): The function to terminate the tasks.
"""
tests/test_cache_executor_serial.py (2)

76-76: Consider adding test coverage for new resource parameters.

While the change correctly implements the resource_dict, the test could be enhanced to verify the new parameters mentioned in the changes (threads_per_core and gpus_per_worker).

Would you like me to help generate additional test cases for these parameters?


Line range hint 1-176: Consider restructuring tests for comprehensive resource management coverage.

While the current changes correctly implement the resource_dict, the test suite could benefit from a dedicated test class focusing on resource management scenarios. This would help ensure the reliability of the new features like threads_per_core and gpus_per_worker.

Suggested test scenarios:

  1. Resource allocation and constraints
  2. Multi-threading behavior
  3. GPU worker distribution
  4. Resource conflict resolution
  5. Error handling for invalid configurations

This separation of concerns would make the test suite more maintainable and provide better coverage for the new functionality.

Would you like me to help design the structure for these additional test cases?

executorlib/cache/shared.py (2)

66-67: Enhance parameter documentation.

While the documentation follows the existing format, consider adding more details:

  • For threads_per_core: Specify if this is specifically for OpenMP threading, and any limitations or requirements.
  • For gpus_per_worker: Add information about GPU requirements, allocation strategy, and what happens if GPUs are unavailable.
-        threads_per_core (int): number of OpenMP threads to be used for each function call
-        gpus_per_worker (int): number of GPUs per worker - defaults to 0
+        threads_per_core (int): Number of OpenMP threads per CPU core. Each function call
+            will use (cores_per_worker * threads_per_core) total threads. Requires OpenMP support.
+        gpus_per_worker (int): Number of GPUs to allocate per worker process. If GPUs are
+            unavailable, execution falls back to CPU-only mode. Defaults to 0.

Line range hint 54-127: Document resource management strategy.

The introduction of granular resource control (threads, GPUs) is a significant architectural change. Consider:

  1. Adding a module-level docstring explaining the resource management strategy
  2. Creating a configuration guide for optimal resource allocation
  3. Documenting the relationship between cores_per_worker, threads_per_core, and gpus_per_worker

This will help users understand how to effectively configure resources for their workloads.

🧰 Tools
🪛 Ruff

115-115: Use key not in dict instead of key not in dict.keys()

Remove .keys()

(SIM118)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between f661177 and 4f91a27.

📒 Files selected for processing (4)
  • executorlib/cache/executor.py (3 hunks)
  • executorlib/cache/shared.py (4 hunks)
  • executorlib/standalone/cache/spawner.py (2 hunks)
  • tests/test_cache_executor_serial.py (3 hunks)
🔇 Additional comments (5)
executorlib/cache/executor.py (2)

49-50: LGTM! Resource parameters are properly passed to execute_tasks_h5.

The implementation correctly forwards the new resource configuration parameters to the execution layer.


18-19: Verify the resource parameter usage.

Let's ensure these new resource parameters are properly handled in the execution layer.

✅ Verification successful

Based on the search results, I can now provide a final response about the resource parameter usage:

Resource parameters are properly integrated across the codebase

The parameters threads_per_core and gpus_per_worker are well-integrated throughout the codebase:

  • They are properly propagated through the executor chain from high-level interfaces to low-level implementations
  • Input validation is implemented via check_threads_per_core() and check_gpus_per_worker()
  • The parameters are correctly handled in resource dictionary construction and task execution
  • Documentation and examples demonstrate proper usage patterns
  • Test coverage exists for both parameters in various scenarios
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check how the new resource parameters are used in execute_tasks_h5 and related functions

# Check for threads_per_core usage
echo "Checking threads_per_core usage:"
rg -A 5 "threads_per_core" --type python

# Check for gpus_per_worker usage
echo "Checking gpus_per_worker usage:"
rg -A 5 "gpus_per_worker" --type python

# Check for resource dictionary construction
echo "Checking resource_dict usage:"
rg -A 5 "resource_dict" --type python

Length of output: 428


Script:

#!/bin/bash
# Description: Check how the new resource parameters are used in execute_tasks_h5 and related functions

# Check for threads_per_core usage
echo "Checking threads_per_core usage:"
rg -A 5 "threads_per_core"

# Check for gpus_per_worker usage
echo "Checking gpus_per_worker usage:"
rg -A 5 "gpus_per_worker"

# Check for resource dictionary construction
echo "Checking resource_dict usage:"
rg -A 5 "resource_dict"

# Check the executor class implementation
echo "Checking executor class implementation:"
ast-grep --pattern $'class Executor {
  $$$
}'

Length of output: 70727

tests/test_cache_executor_serial.py (1)

118-118: LGTM!

The resource_dict implementation is consistent with the previous test method.

executorlib/cache/shared.py (2)

54-56: LGTM: Well-structured parameter additions.

The new parameters are properly typed and have sensible defaults. The parameter ordering is logical, grouping compute resources together.


113-113: Verify resource dictionary handling in downstream functions.

The resource dictionary is now passed to both serialize_funct_h5 and execute_function. Let's verify that these functions properly handle the resource parameters.

Also applies to: 127-127

✅ Verification successful

Let me generate additional verification scripts to check the implementation of serialize_funct_h5 which wasn't found in the previous results.


Based on the verification results, I can now generate the final response:

Resource dictionary handling is properly implemented across the execution chain

The resource dictionary is correctly:

  • Accepted as a parameter in serialize_funct_h5 function
  • Handled in the interactive executor's submit method with proper validation (check_resource_dict)
  • Propagated through task execution in executorlib/interactive/shared.py
  • Used in test cases with both empty and specific resource configurations
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check how serialize_funct_h5 and execute_function handle resource_dict
echo "Checking serialize_funct_h5 implementation..."
ast-grep --pattern 'def serialize_funct_h5($$$resource_dict$$$)' -A 10

echo "Checking execute_function implementations..."
rg -l "def.*execute.*function" | xargs rg "resource_dict" -A 5

Length of output: 6921


Script:

#!/bin/bash
# Search for serialize_funct_h5 implementation
echo "Searching for serialize_funct_h5 implementation..."
rg "def serialize_funct_h5" -A 10

# Check for resource_dict usage in cache-related files
echo "Checking resource_dict usage in cache files..."
fd cache --type f | xargs rg "resource_dict" -A 5

Length of output: 4353

Comment on lines 30 to 32
if resource_dict is None:
resource_dict = {"cwd": None}
return subprocess.Popen(command, universal_newlines=True, cwd=resource_dict["cwd"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add validation for resource dictionary access.

The current implementation might raise KeyError if the provided dictionary doesn't contain the 'cwd' key, and there's no validation of the value type.

Consider adding validation:

     if resource_dict is None:
         resource_dict = {"cwd": None}
+    if "cwd" not in resource_dict:
+        resource_dict["cwd"] = None
+    elif not (resource_dict["cwd"] is None or isinstance(resource_dict["cwd"], str)):
+        raise TypeError("cwd must be None or a string")
     return subprocess.Popen(command, universal_newlines=True, cwd=resource_dict["cwd"])
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if resource_dict is None:
resource_dict = {"cwd": None}
return subprocess.Popen(command, universal_newlines=True, cwd=resource_dict["cwd"])
if resource_dict is None:
resource_dict = {"cwd": None}
if "cwd" not in resource_dict:
resource_dict["cwd"] = None
elif not (resource_dict["cwd"] is None or isinstance(resource_dict["cwd"], str)):
raise TypeError("cwd must be None or a string")
return subprocess.Popen(command, universal_newlines=True, cwd=resource_dict["cwd"])

@@ -157,7 +157,7 @@ def test_executor_function_dependence_args(self):
"cache_directory": cache_dir,
"execute_function": execute_in_subprocess,
"cores_per_worker": 1,
"cwd": None,
"resource_dict": {"cwd": None},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Review comment is accurate - resource management test coverage needs improvement

The codebase has limited resource management testing:

  • Most tests only use empty resource dictionaries {} or basic {"cwd": None}
  • test_shared_input_check.py only validates input types/formats
  • test_dependencies_executor.py has basic core allocation tests
  • No comprehensive tests for:
    • Thread and GPU resource allocation behaviors
    • Resource constraint handling
    • Error cases with invalid configurations
    • Resource cleanup and release
    • Resource contention scenarios
🔗 Analysis chain

LGTM! Consider adding resource management integration tests.

The resource_dict implementation is consistent. However, since this PR introduces significant resource management changes, consider adding integration tests that verify:

  1. Resource allocation behavior with different thread and GPU configurations
  2. Resource constraints and their impact on task execution
  3. Error handling for invalid resource configurations

Let's check if there are any existing resource management tests:


Let's examine the actual test coverage of resource management in these files.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for existing resource management tests
rg -l "threads_per_core|gpus_per_worker|resource_dict" "tests/"

Length of output: 310


Script:

#!/bin/bash
# Check resource management test coverage
rg -A 5 "resource_dict|threads_per_core|gpus_per_worker" "tests/" --type py

Length of output: 13295

Comment on lines 100 to 108
resource_dict = task_dict["resource_dict"]
if "cores" not in resource_dict:
resource_dict["cores"] = cores_per_worker
if "gpus_per_core" not in resource_dict:
resource_dict["gpus_per_core"] = int(gpus_per_worker / cores_per_worker)
if "threads_per_core" not in resource_dict:
resource_dict["threads_per_core"] = threads_per_core
if "cwd" not in resource_dict:
resource_dict["cwd"] = cwd
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add input validation and handle edge cases in resource allocation.

The current resource dictionary construction has several potential issues:

  1. Integer division in gpus_per_worker / cores_per_worker could result in zero GPUs per core when the ratio is non-zero but < 1
  2. No validation of input values
  3. No handling of edge cases

Consider adding these safety checks:

 resource_dict = task_dict["resource_dict"]
+if cores_per_worker <= 0:
+    raise ValueError("cores_per_worker must be positive")
+if gpus_per_worker < 0:
+    raise ValueError("gpus_per_worker cannot be negative")
+if threads_per_core <= 0:
+    raise ValueError("threads_per_core must be positive")
 if "cores" not in resource_dict:
     resource_dict["cores"] = cores_per_worker
 if "gpus_per_core" not in resource_dict:
-    resource_dict["gpus_per_core"] = int(gpus_per_worker / cores_per_worker)
+    # Use float division and round up to ensure fair GPU distribution
+    resource_dict["gpus_per_core"] = max(1, int((gpus_per_worker + cores_per_worker - 1) // cores_per_worker)) if gpus_per_worker > 0 else 0
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
resource_dict = task_dict["resource_dict"]
if "cores" not in resource_dict:
resource_dict["cores"] = cores_per_worker
if "gpus_per_core" not in resource_dict:
resource_dict["gpus_per_core"] = int(gpus_per_worker / cores_per_worker)
if "threads_per_core" not in resource_dict:
resource_dict["threads_per_core"] = threads_per_core
if "cwd" not in resource_dict:
resource_dict["cwd"] = cwd
resource_dict = task_dict["resource_dict"]
if cores_per_worker <= 0:
raise ValueError("cores_per_worker must be positive")
if gpus_per_worker < 0:
raise ValueError("gpus_per_worker cannot be negative")
if threads_per_core <= 0:
raise ValueError("threads_per_core must be positive")
if "cores" not in resource_dict:
resource_dict["cores"] = cores_per_worker
if "gpus_per_core" not in resource_dict:
# Use float division and round up to ensure fair GPU distribution
resource_dict["gpus_per_core"] = max(1, int((gpus_per_worker + cores_per_worker - 1) // cores_per_worker)) if gpus_per_worker > 0 else 0
if "threads_per_core" not in resource_dict:
resource_dict["threads_per_core"] = threads_per_core
if "cwd" not in resource_dict:
resource_dict["cwd"] = cwd

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (1)
executorlib/standalone/cache/spawner.py (1)

9-9: Enhance docstring and type hints for better clarity.

While the change to use resource_dict improves extensibility, the documentation could be more comprehensive.

Consider these improvements:

-    resource_dict: Optional[dict] = None,
+    resource_dict: Optional[dict[str, Optional[str]]] = None,
     resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function.
                          Example resource dictionary: {
                              cwd: None,
                          }
+                        Keys:
+                            cwd (Optional[str]): Working directory for the subprocess. 
+                                                If None, uses the current working directory.

Also applies to: 17-20

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 4f91a27 and a813845.

📒 Files selected for processing (4)
  • executorlib/cache/executor.py (2 hunks)
  • executorlib/cache/shared.py (3 hunks)
  • executorlib/standalone/cache/spawner.py (2 hunks)
  • tests/test_cache_executor_serial.py (0 hunks)
💤 Files with no reviewable changes (1)
  • tests/test_cache_executor_serial.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • executorlib/cache/executor.py
  • executorlib/cache/shared.py
🧰 Additional context used
🪛 Ruff
executorlib/standalone/cache/spawner.py

30-33: Combine if branches using logical or operator

Combine if branches

(SIM114)

Comment on lines +30 to +34
if resource_dict is None:
resource_dict = {"cwd": None}
elif len(resource_dict) == 0:
resource_dict = {"cwd": None}
return subprocess.Popen(command, universal_newlines=True, cwd=resource_dict["cwd"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Simplify conditions and future-proof resource handling.

The current implementation can be simplified and made more robust.

Consider these improvements:

-    if resource_dict is None:
-        resource_dict = {"cwd": None}
-    elif len(resource_dict) == 0:
-        resource_dict = {"cwd": None}
+    if resource_dict is None or len(resource_dict) == 0:
+        resource_dict = {"cwd": None}
+    # Create a copy to avoid modifying the input dictionary
+    resource_dict = resource_dict.copy()

Additionally, consider defining a set of supported resource keys as constants to make the code more maintainable as new resource types are added in the future:

SUPPORTED_RESOURCES = {"cwd"}
🧰 Tools
🪛 Ruff

30-33: Combine if branches using logical or operator

Combine if branches

(SIM114)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant