-
Notifications
You must be signed in to change notification settings - Fork 4
Create cache directory #601
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
for more information, see https://pre-commit.ci
WalkthroughThe changes enhance the management of task files within the cache directory by modifying the Changes
Sequence Diagram(s)sequenceDiagram
participant Caller as execute_tasks_h5
participant FS as File System
participant Executor as execute_function
Caller->>FS: Check if task_key directory exists
Caller->>FS: Check if "cache.h5out" exists in subdirectory
alt Not found
FS-->>Caller: Subdirectory does not exist
Caller->>FS: Create subdirectory (task_key)
Caller->>Caller: Update input file path to "cache_directory/task_key/cache.h5in"
Caller->>Caller: Update output file path to "cache_directory/task_key/cache.h5out"
Caller->>Executor: Execute function with new cache directory "cache_directory/task_key"
else Found
Caller->>Executor: Execute function with existing cache directory
end
Possibly related PRs
Poem
✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (2)
executorlib/cache/shared.py (2)
114-114
: Add a comment explaining the new directory structureThe code creates a new directory structure for caching, where each task has its own subdirectory. It would be helpful to add a comment explaining this new structure and its purpose.
+ # Create a dedicated subdirectory for each task to better organize cache files os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True)
142-142
: Add backwards compatibility for old cache filesSince this change modifies the location of cache files, users upgrading from a previous version might experience issues with existing cache files.
Consider adding backwards compatibility to check for files in the old location as well:
file_name_dict[task_key] = os.path.join( cache_directory, task_key, "cache.h5out" ) +# Add code to handle existing cache files for backward compatibility +if not os.path.exists(file_name_dict[task_key]) and os.path.exists(os.path.join(cache_directory, task_key + ".h5out")): + # Either migrate old files to new structure or add logic to check old locations
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
executorlib/cache/shared.py
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (14)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.11)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.12)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.13)
- GitHub Check: unittest_mpich (ubuntu-latest, 3.11)
- GitHub Check: unittest_openmpi (macos-latest, 3.13)
- GitHub Check: unittest_mpich (ubuntu-latest, 3.12)
- GitHub Check: unittest_win
- GitHub Check: unittest_old
- GitHub Check: unittest_mpich (ubuntu-latest, 3.13)
- GitHub Check: unittest_mpich (macos-latest, 3.13)
- GitHub Check: unittest_flux_openmpi
- GitHub Check: unittest_flux_mpich
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) | ||
file_name = os.path.join(cache_directory, task_key, "cache.h5in") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The file structure change is inconsistent with _check_task_output function
The code changes introduce a new directory structure for caching, where each task has its own subdirectory. While these lines correctly implement the new structure, the _check_task_output
function (line 198) still looks for output files in the old location.
You need to update the _check_task_output
function to work with the new directory structure:
def _check_task_output(
task_key: str, future_obj: Future, cache_directory: str
) -> Future:
"""
Check the output of a task and set the result of the future object if available.
Args:
task_key (str): The key of the task.
future_obj (Future): The future object associated with the task.
cache_directory (str): The directory where the HDF5 files are stored.
Returns:
Future: The updated future object.
"""
- file_name = os.path.join(cache_directory, task_key + ".h5out")
+ file_name = os.path.join(cache_directory, task_key, "cache.h5out")
if not os.path.exists(file_name):
return future_obj
exec_flag, result = get_output(file_name=file_name)
if exec_flag:
future_obj.set_result(result)
return future_obj
Also applies to: 139-139, 142-142
executorlib/cache/shared.py
Outdated
if task_key not in os.listdir( | ||
cache_directory | ||
) and "cache.h5out" not in os.listdir( | ||
os.path.join(cache_directory, task_key) | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix potential FileNotFoundError in directory check
The condition to check if a task needs to be executed has a potential issue. If task_key
doesn't exist in cache_directory
, trying to access os.listdir(os.path.join(cache_directory, task_key))
will raise a FileNotFoundError
because the directory doesn't exist yet.
Replace the current check with:
-if task_key not in os.listdir(
- cache_directory
-) and "cache.h5out" not in os.listdir(
- os.path.join(cache_directory, task_key)
-):
+if not os.path.exists(os.path.join(cache_directory, task_key)) or not os.path.exists(os.path.join(cache_directory, task_key, "cache.h5out")):
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if task_key not in os.listdir( | |
cache_directory | |
) and "cache.h5out" not in os.listdir( | |
os.path.join(cache_directory, task_key) | |
): | |
if not os.path.exists(os.path.join(cache_directory, task_key)) or not os.path.exists(os.path.join(cache_directory, task_key, "cache.h5out")): |
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
executorlib/standalone/hdf.py (2)
105-117
: Consider handling backward compatibility with the old cache structure.The change from a flat file structure to a nested directory structure might cause problems with existing cache data. Consider adding a migration path or fallback mechanism to handle both formats.
def get_cache_data(cache_directory: str) -> list[dict]: file_lst = [] + # Handle new directory-based cache structure for task_key in os.listdir(cache_directory): + task_path = os.path.join(cache_directory, task_key) + if os.path.isdir(task_path): + file_name = os.path.join(task_path, "cache.h5in") + if os.path.isfile(file_name): + try: + with h5py.File(file_name, "r") as hdf: + file_content_dict = { + key: cloudpickle.loads(np.void(hdf["/" + key])) + for key in group_dict.values() + if key in hdf + } + file_content_dict["filename"] = file_name + file_lst.append(file_content_dict) + except (IOError, OSError): + continue + # Handle old flat cache structure for backward compatibility + elif task_key.endswith('.h5in') and os.path.isfile(task_path): + try: + with h5py.File(task_path, "r") as hdf: + file_content_dict = { + key: cloudpickle.loads(np.void(hdf["/" + key])) + for key in group_dict.values() + if key in hdf + } + file_content_dict["filename"] = task_path + file_lst.append(file_content_dict) + except (IOError, OSError): + continue - file_name = os.path.join(cache_directory, task_key, "cache.h5in") - with h5py.File(file_name, "r") as hdf: - file_content_dict = { - key: cloudpickle.loads(np.void(hdf["/" + key])) - for key in group_dict.values() - if key in hdf - } - file_content_dict["filename"] = file_name - file_lst.append(file_content_dict) return file_lst
105-105
: Add documentation explaining the new cache directory structure.The function's docstring should be updated to explain the new directory-based cache structure, helping users understand how cache data is now organized.
def get_cache_data(cache_directory: str) -> list[dict]: + """ + Retrieve cache data from the nested directory structure. + + Each task has its own subdirectory named after the task_key within the cache_directory. + Inside each task's subdirectory, there is a 'cache.h5in' file containing the task data. + + Args: + cache_directory (str): The base directory for the cache + + Returns: + list[dict]: A list of dictionaries containing the cache data for each task + """ file_lst = [] for task_key in os.listdir(cache_directory):
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
executorlib/cache/shared.py
(3 hunks)executorlib/standalone/hdf.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- executorlib/cache/shared.py
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.11)
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.12)
- GitHub Check: unittest_win
- GitHub Check: unittest_openmpi (ubuntu-latest, 3.13)
- GitHub Check: unittest_openmpi (macos-latest, 3.13)
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
executorlib/standalone/hdf.py
Outdated
for task_key in os.listdir(cache_directory): | ||
file_name = os.path.join(cache_directory, task_key, "cache.h5in") | ||
with h5py.File(file_name, "r") as hdf: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling for directory and file access.
The code now assumes that all entries in the cache directory are valid subdirectories and that each contains a "cache.h5in" file. This could lead to errors if:
- Any entry in the cache directory is a file rather than a directory
- The "cache.h5in" file doesn't exist in a subdirectory
- Insufficient permissions to access directories or files
def get_cache_data(cache_directory: str) -> list[dict]:
file_lst = []
for task_key in os.listdir(cache_directory):
+ task_dir = os.path.join(cache_directory, task_key)
+ if not os.path.isdir(task_dir):
+ continue
+ file_name = os.path.join(task_dir, "cache.h5in")
+ if not os.path.isfile(file_name):
+ continue
+ try:
- file_name = os.path.join(cache_directory, task_key, "cache.h5in")
- with h5py.File(file_name, "r") as hdf:
+ with h5py.File(file_name, "r") as hdf:
file_content_dict = {
key: cloudpickle.loads(np.void(hdf["/" + key]))
for key in group_dict.values()
if key in hdf
}
+ file_content_dict["filename"] = file_name
+ file_lst.append(file_content_dict)
+ except (IOError, OSError) as e:
+ # Log or handle the error as appropriate for your application
+ continue
- file_content_dict["filename"] = file_name
- file_lst.append(file_content_dict)
return file_lst
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
for task_key in os.listdir(cache_directory): | |
file_name = os.path.join(cache_directory, task_key, "cache.h5in") | |
with h5py.File(file_name, "r") as hdf: | |
def get_cache_data(cache_directory: str) -> list[dict]: | |
file_lst = [] | |
for task_key in os.listdir(cache_directory): | |
task_dir = os.path.join(cache_directory, task_key) | |
if not os.path.isdir(task_dir): | |
continue | |
file_name = os.path.join(task_dir, "cache.h5in") | |
if not os.path.isfile(file_name): | |
continue | |
try: | |
with h5py.File(file_name, "r") as hdf: | |
file_content_dict = { | |
key: cloudpickle.loads(np.void(hdf["/" + key])) | |
for key in group_dict.values() | |
if key in hdf | |
} | |
file_content_dict["filename"] = file_name | |
file_lst.append(file_content_dict) | |
except (IOError, OSError) as e: | |
# Log or handle the error as appropriate for your application | |
continue | |
return file_lst |
for more information, see https://pre-commit.ci
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #601 +/- ##
==========================================
+ Coverage 96.39% 96.40% +0.01%
==========================================
Files 28 28
Lines 1248 1252 +4
==========================================
+ Hits 1203 1207 +4
Misses 45 45 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
executorlib/standalone/hdf.py (1)
107-118
:⚠️ Potential issueAdd error handling for directory and file access.
The code now assumes that all entries in the cache directory are valid subdirectories and that each contains a "cache.h5out" file. This could lead to errors if:
- Any entry in the cache directory is a file rather than a directory
- Insufficient permissions to access directories or files
Additionally, the
os.makedirs()
call is unnecessary since this function is only reading data, not creating files.def get_cache_data(cache_directory: str) -> list[dict]: file_lst = [] for task_key in os.listdir(cache_directory): + task_dir = os.path.join(cache_directory, task_key) + if not os.path.isdir(task_dir): + continue file_name = os.path.join(cache_directory, task_key, "cache.h5out") - os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) if os.path.exists(file_name): + try: - with h5py.File(file_name, "r") as hdf: + with h5py.File(file_name, "r") as hdf: file_content_dict = { key: cloudpickle.loads(np.void(hdf["/" + key])) for key in group_dict.values() if key in hdf } + file_content_dict["filename"] = file_name + file_lst.append(file_content_dict) + except (IOError, OSError) as e: + # Log or handle the error as appropriate for your application + continue - file_content_dict["filename"] = file_name - file_lst.append(file_content_dict) return file_lst
🧹 Nitpick comments (2)
executorlib/interactive/shared.py (1)
151-156
: Simplified code suggestion for cache file existence check.The current implementation creates the directory structure and then performs two separate checks to verify if a cache file exists. This can be simplified by using
os.path.exists()
directly on the constructed file path.os.makedirs(os.path.join(cache_directory, task_key), exist_ok=True) file_name = os.path.join(cache_directory, task_key, "cache.h5out") - if not ( - task_key in os.listdir(cache_directory) - and "cache.h5out" in os.listdir(os.path.join(cache_directory, task_key)) - ): + if not os.path.exists(file_name):tests/test_cache_shared.py (1)
45-46
: Consider extracting the file path creation logic to a helper method.The same file path construction pattern is repeated across multiple test methods. Consider extracting this into a helper method to reduce duplication and make future path format changes easier.
def get_output_file_path(cache_directory, task_key): return os.path.join(cache_directory, task_key, "cache.h5out")Then use it in your tests:
self.assertTrue( get_runtime(file_name=get_output_file_path(cache_directory, task_key)) > 0.0 )Also applies to: 73-75, 101-103
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
executorlib/interactive/shared.py
(1 hunks)executorlib/standalone/hdf.py
(1 hunks)tests/test_cache_shared.py
(6 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: unittest_win
- GitHub Check: unittest_openmpi (macos-latest, 3.13)
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
🔇 Additional comments (1)
tests/test_cache_shared.py (1)
34-36
: LGTM on test changes!The test file modifications correctly update the path structure to match the new nested directory approach for caching. This ensures tests will properly validate the updated implementation.
Also applies to: 62-64, 90-92
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
executorlib/cache/queue_spawner.py (1)
71-73
: Good improvement to dynamically name jobs based on directory context.The change from a static job name to one derived from the parent directory structure is a nice enhancement that aligns with the PR's objective of improving cache directory management. This will make jobs more identifiable in the queuing system and provide better context when viewing queue status.
However, consider adding a fallback value in case the directory structure doesn't yield a suitable name (e.g., if the path is unusual or empty).
resource_dict["job_name"] = os.path.basename( os.path.dirname(os.path.abspath(cwd)) ) or "pysqa" # Fallback if basename is empty
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
executorlib/cache/queue_spawner.py
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: unittest_win
- GitHub Check: unittest_openmpi (macos-latest, 3.13)
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
Summary by CodeRabbit