-
Notifications
You must be signed in to change notification settings - Fork 4
[Feature] Add option to overwrite cache_key #676
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
## Walkthrough
The function `serialize_funct_h5` in `executorlib/standalone/serialize.py` was updated with an optional `cache_key` parameter that, if provided, is used directly as the `task_key` instead of generating it by serializing and hashing inputs. The `execute_tasks_h5` and `execute_tasks` functions were extended to accept and propagate this `cache_key`. Additionally, `execute_tasks_h5` gained a `disable_dependencies` parameter. Cache file handling now uses a recursive search for cache files. New tests were added to verify cache key functionality and cache file naming. The `dump` function was enhanced to ensure output directories exist before file creation.
## Changes
| File(s) | Change Summary |
|------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------|
| executorlib/standalone/serialize.py | Added optional `cache_key` parameter to `serialize_funct_h5`; uses it as `task_key` if provided, else hashes serialized inputs. |
| executorlib/task_scheduler/file/shared.py | Added `disable_dependencies` parameter to `execute_tasks_h5`; extracts `cache_key` from `resource_dict` and passes it to `serialize_funct_h5`; uses `get_cache_files` to check cache files. |
| executorlib/task_scheduler/interactive/shared.py | Added optional `cache_key` parameter to `execute_tasks` and `_execute_task_with_cache`, propagating it to `serialize_funct_h5`; uses `get_cache_files` for cache file existence checks. |
| executorlib/standalone/cache.py | Added `get_cache_files` to recursively find cache files; modified `get_cache_data` to use recursive file discovery. |
| executorlib/task_scheduler/file/hdf.py | Modified `dump` to ensure directory exists before opening HDF5 file by creating directories as needed. |
| tests/test_cache_fileexecutor_serial.py | Added `test_executor_mixed_cache_key` to test submitting tasks with explicit `cache_key` in `resource_dict`. |
| tests/test_singlenodeexecutor_cache.py | Added `test_cache_key` to verify cache key usage and filename tagging in cached tasks with explicit `cache_key`. |
## Sequence Diagram(s)
```mermaid
sequenceDiagram
participant Caller
participant serialize_funct_h5
Caller->>serialize_funct_h5: Call with fn, fn_args, fn_kwargs, resource_dict, cache_key (optional)
alt cache_key provided
serialize_funct_h5->>serialize_funct_h5: Use cache_key as task_key
else
serialize_funct_h5->>serialize_funct_h5: Serialize inputs and hash to generate task_key
end
serialize_funct_h5-->>Caller: Return data dict with task_key Possibly related PRs
Poem
|
for more information, see https://pre-commit.ci
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #676 +/- ##
==========================================
+ Coverage 96.78% 96.81% +0.02%
==========================================
Files 29 29
Lines 1306 1317 +11
==========================================
+ Hits 1264 1275 +11
Misses 42 42 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
executorlib/standalone/serialize.py (1)
65-76
: Add unit tests to cover the new branchCodecov flags the newly-introduced
cache_key
branch as uncovered. A minimal test should:
- Call
serialize_funct_h5()
with a dummycache_key
inresource_dict
.- Assert that the returned
task_key
equals that value (afterstr()
coercion if the previous comment is addressed).- Verify that the function still behaves identically when
cache_key
is absent (hash path).This guards against future regressions and keeps overall coverage steady.
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 66-66: executorlib/standalone/serialize.py#L66
Added line #L66 was not covered by tests
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
executorlib/standalone/serialize.py
(1 hunks)
🧰 Additional context used
🪛 GitHub Check: codecov/patch
executorlib/standalone/serialize.py
[warning] 66-66: executorlib/standalone/serialize.py#L66
Added line #L66 was not covered by tests
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-openmpi.yml)
- GitHub Check: benchmark (ubuntu-latest, 3.13, .ci_support/environment-mpich.yml)
- GitHub Check: notebooks_integration
executorlib/standalone/serialize.py
Outdated
if "cache_key" in resource_dict: | ||
task_key = resource_dict["cache_key"] | ||
else: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Validate & normalize cache_key
to guarantee a str
return type
serialize_funct_h5()
is documented (and relied on elsewhere) to return task_key: str
, yet the new code passes through whatever object happens to be under resource_dict["cache_key"]
unchanged.
If callers accidentally supply an int
, UUID
, or any other non-string value this will violate the contract and break task_key
consumers later in the pipeline (e.g. when used as an HDF5 node name).
- if "cache_key" in resource_dict:
- task_key = resource_dict["cache_key"]
+ if "cache_key" in resource_dict:
+ task_key_value = resource_dict["cache_key"]
+ if not isinstance(task_key_value, str):
+ task_key_value = str(task_key_value) # or raise TypeError
+ task_key = task_key_value
While you’re here, please add a short note to the docstring clarifying the new cache_key
behaviour and acceptable types.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if "cache_key" in resource_dict: | |
task_key = resource_dict["cache_key"] | |
else: | |
if "cache_key" in resource_dict: | |
task_key_value = resource_dict["cache_key"] | |
if not isinstance(task_key_value, str): | |
task_key_value = str(task_key_value) # or raise TypeError | |
task_key = task_key_value | |
else: | |
… |
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 66-66: executorlib/standalone/serialize.py#L66
Added line #L66 was not covered by tests
🤖 Prompt for AI Agents
In executorlib/standalone/serialize.py around lines 65 to 67, the code assigns
task_key directly from resource_dict["cache_key"] without ensuring it is a
string, which can break downstream consumers expecting a string. Modify the code
to explicitly convert the cache_key value to a string before assigning it to
task_key. Additionally, update the serialize_funct_h5() function docstring to
include a note about the cache_key parameter, specifying that it will be
normalized to a string and listing acceptable input types.
for more information, see https://pre-commit.ci
@liamhuber After some initial bug the CI now seems to pass, can you check if it resolves your issue? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🔭 Outside diff range comments (1)
executorlib/task_scheduler/file/shared.py (1)
122-133
:⚠️ Potential issueSkip scheduling when dependencies are disabled but Futures are supplied
When
disable_dependencies=True
and future objects are detected, the code sets an exception on the user’s future but still schedules the task, resulting in an unnecessary subprocess launch that will almost certainly fail.- if len(future_wait_key_lst) > 0: - task_dict["future"].set_exception( - ValueError( - "Future objects are not supported as input if disable_dependencies=True." - ) - ) - task_dependent_lst = [] + if future_wait_key_lst: + task_dict["future"].set_exception( + ValueError( + "Future objects are not supported when disable_dependencies=True." + ) + ) + future_queue.task_done() + continue # Do NOT schedule the invalid task + task_dependent_lst = []Avoids wasted work and clearer semantics.
🧹 Nitpick comments (3)
tests/test_cache_fileexecutor_serial.py (1)
45-51
: Consider validating that the custom cache key was actually usedThe new test checks that the task completes and returns the correct value, but it does not inspect the cache directory to ensure the file was written under the expected key (
abc
). A quick extra assertion will protect against regressions wherecache_key
is silently ignored.@@ with FileTaskScheduler(execute_function=execute_in_subprocess) as exe: fs1 = exe.submit(my_funct, 1, b=2, resource_dict={"cache_key": "abc"}) @@ self.assertTrue(fs1.done()) + # Optional: verify the cache folder was created with the explicit key + cache_path = os.path.abspath(os.path.join("cache", "abc", "cache.h5out")) + self.assertTrue(os.path.exists(cache_path))tests/test_singlenodeexecutor_cache.py (1)
37-50
: Minor pythonic nit ‑ prefer generator expressions insidesum()
The intermediate lists created in the two
sum([...])
calls are unnecessary; using generator expressions is more memory-efficient and makes the intent clearer.- self.assertEqual(sum([c["output"] for c in cache_lst]), sum(result_lst)) + self.assertEqual(sum(c["output"] for c in cache_lst), sum(result_lst)) @@ - sum([sum(c["input_args"][0]) for c in cache_lst]), sum(result_lst) + sum(sum(c["input_args"][0]) for c in cache_lst), sum(result_lst)🧰 Tools
🪛 Pylint (3.3.7)
[refactor] 47-47: Consider using a generator instead 'sum(c['output'] for c in cache_lst)'
(R1728)
[refactor] 49-49: Consider using a generator instead 'sum(sum(c['input_args'][0]) for c in cache_lst)'
(R1728)
executorlib/task_scheduler/file/shared.py (1)
115-121
: Simplify existence check – avoid twoos.listdir
callsListing the whole directory tree on every task iteration is O(n) per call and becomes expensive for large caches. A direct
os.path.exists
on the expected output file is faster and simpler.- if not ( - task_key in os.listdir(cache_directory) - and "cache.h5out" - in os.listdir(os.path.join(cache_directory, task_key)) - ): + output_file = os.path.join(cache_directory, task_key, "cache.h5out") + if not os.path.exists(output_file):
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
executorlib/standalone/serialize.py
(3 hunks)executorlib/task_scheduler/file/shared.py
(2 hunks)executorlib/task_scheduler/interactive/shared.py
(6 hunks)tests/test_cache_fileexecutor_serial.py
(1 hunks)tests/test_singlenodeexecutor_cache.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- executorlib/standalone/serialize.py
🧰 Additional context used
🧬 Code Graph Analysis (1)
tests/test_singlenodeexecutor_cache.py (3)
executorlib/executor/single.py (1)
SingleNodeExecutor
(20-180)executorlib/executor/base.py (1)
submit
(53-87)executorlib/standalone/cache.py (1)
get_cache_data
(16-42)
🪛 Pylint (3.3.7)
tests/test_singlenodeexecutor_cache.py
[refactor] 47-47: Consider using a generator instead 'sum(c['output'] for c in cache_lst)'
(R1728)
[refactor] 49-49: Consider using a generator instead 'sum(sum(c['input_args'][0]) for c in cache_lst)'
(R1728)
🔇 Additional comments (2)
executorlib/task_scheduler/file/shared.py (1)
105-113
:cache_key
extraction looks good
The key is removed from the resource dictionary before hashing, preventing it from influencing the function hash. 👍executorlib/task_scheduler/interactive/shared.py (1)
135-158
: Duplicate cache key in bothcache_key
arg andresource_dict
serialize_funct_h5
receivescache_key=cache_key
while the original (possibly unchanged)"cache_key"
remains insideresource_dict
. If the serializer also hashes the dictionary, this duplicate information could change the hash or trigger unexpected behaviour.Recommend removing the key from
task_dict["resource_dict"]
before the call:res_dict = task_dict.get("resource_dict", {}).copy() cache_key = res_dict.pop("cache_key", cache_key) ... resource_dict=res_dict, cache_key=cache_key,
cache_lst = get_cache_data(cache_directory=cache_directory) | ||
for entry in cache_lst: | ||
self.assertTrue("same" in entry['filename']) | ||
self.assertEqual(sum([c["output"] for c in cache_lst]), sum(result_lst)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Potential false-negative when unrelated cache entries exist
assertTrue("same" in entry["filename"])
will fail if any cache entry does not contain “same”.
Because other tests in this class write to the same cache_directory
, this assertion will intermittently fail unless tearDown
always removes all prior files and the OS guarantees ordering.
To reduce brittleness:
self.assertTrue(all("same" in e["filename"] for e in cache_lst))
or filter the list for the keys you expect.
🧰 Tools
🪛 Pylint (3.3.7)
[refactor] 47-47: Consider using a generator instead 'sum(c['output'] for c in cache_lst)'
(R1728)
🤖 Prompt for AI Agents
In tests/test_singlenodeexecutor_cache.py around lines 44 to 47, the assertion
self.assertTrue("same" in entry['filename']) can cause false negatives if any
cache entry does not contain "same" due to other tests writing to the same
cache_directory. To fix this, replace the assertion with one that checks all
entries, such as using self.assertTrue(all("same" in e["filename"] for e in
cache_lst)), or filter cache_lst to only include expected entries before
asserting, ensuring the test only verifies relevant cache entries.
cache_key: Optional[str] = None, | ||
queue_join_on_shutdown: bool = True, | ||
**kwargs, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Global cache_key
parameter may clash with per-task keys
execute_tasks
now takes a single cache_key
that is forwarded to every task, overwriting any per-task value that might be present inside task_dict["resource_dict"]
.
This is inconsistent with the file-based scheduler (which honours per-task keys) and can lead to collisions when multiple different tasks are queued.
Consider removing the top-level parameter and relying solely on resource_dict["cache_key"]
, or merge logic like:
cache_key = task_dict.get("resource_dict", {}).pop("cache_key", None) or cache_key
🤖 Prompt for AI Agents
In executorlib/task_scheduler/interactive/shared.py around lines 25 to 27, the
global cache_key parameter is overriding per-task cache_key values, causing
potential collisions. Remove the global cache_key parameter and update the code
to use the cache_key from each task's resource_dict instead, or implement logic
to merge them by prioritizing the per-task cache_key if present, falling back to
the global one only if necessary.
@jan-janssen, brilliant, yes, this works 🚀 |
One would-be-nice would be if the import pandas as pd
from executorlib import SingleNodeExecutor, get_cache_data
def the_task(x, y):
return x + y
cache_directory="./cache"
with SingleNodeExecutor(cache_directory=cache_directory) as exe:
f = exe.submit(the_task, 1, 2, resource_dict={"cache_key": "my_key/subdir"})
cache_lst = get_cache_data(cache_directory=cache_directory)
pd.DataFrame(cache_lst) This nicely creates This is merely a nice-to-have though, and if it never materializes I'll still live. And it should anyhow probably be done in a second PR that somehow unifies the optional path-like treatment of |
I hope these changes are easier once #681 is merged, so I marked the pull request as draft for now. |
for more information, see https://pre-commit.ci
for more information, see https://pre-commit.ci
# Conflicts: # executorlib/task_scheduler/file/hdf.py
@liamhuber I updated the pull request to support hierarchical keys and added the corresponding tests. Can you quickly validate that these changes are compatible to your interface in pyiron_workflow? Then I merge them and release a new executorlib version. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
tests/test_singlenodeexecutor_cache.py (1)
37-47
: Assertion can still false-negative when unrelated cache entries slip inThe loop asserts on every entry, which fails as soon as one unrelated file appears (e.g. from a parallel test run). A safer one-liner:
- for entry in cache_lst: - self.assertTrue("same" in entry['filename']) + self.assertTrue(all("same" in e["filename"] for e in cache_lst))This also sidesteps the need for the explicit loop.
🧰 Tools
🪛 Pylint (3.3.7)
[refactor] 47-47: Consider using a generator instead 'sum(c['output'] for c in cache_lst)'
(R1728)
🧹 Nitpick comments (3)
executorlib/task_scheduler/file/hdf.py (1)
20-23
: Doc-string and path handling slightly out of syncNice touch making
dump()
resilient to missing parent directories.
Two small follow-ups:
- The doc-string still claims
file_name
“as absolute path” even though you now transparently accept relative paths – worth relaxing that wording.- If an empty string is ever passed,
os.path.abspath("")
resolves to the current working directory;os.makedirs("", …)
would then raiseFileNotFoundError
. A quick guard would avoid surprising callers.- file_name_abs = os.path.abspath(file_name) + if not file_name: + raise ValueError("file_name must be non-empty.") + file_name_abs = os.path.abspath(file_name)executorlib/standalone/cache.py (1)
16-31
: Graceful handling of a missing cache directory
get_cache_files()
now walks the directory tree – great!
Minor edge case: whencache_directory
does not existos.walk()
silently returns nothing and callers see an empty list. Down-stream code (e.g.get_cache_data
) interprets that as “no cached results”, which is fine but different from the previous behaviour that raisedFileNotFoundError
.If you want to preserve the old contract (and catch typos early) consider an explicit existence check:
+ if not os.path.isdir(cache_directory_abs): + raise FileNotFoundError(cache_directory_abs)tests/test_singlenodeexecutor_cache.py (1)
47-50
: Prefer generator expressions to avoid temporary listsPylint hint R1728 applies here as well – no need to materialise an intermediary list:
- self.assertEqual(sum([c["output"] for c in cache_lst]), sum(result_lst)) + self.assertEqual(sum(c["output"] for c in cache_lst), sum(result_lst))Same for the next assertion. Purely cosmetic but trims memory and keeps the style consistent.
🧰 Tools
🪛 Pylint (3.3.7)
[refactor] 47-47: Consider using a generator instead 'sum(c['output'] for c in cache_lst)'
(R1728)
[refactor] 49-49: Consider using a generator instead 'sum(sum(c['input_args'][0]) for c in cache_lst)'
(R1728)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
executorlib/standalone/cache.py
(2 hunks)executorlib/task_scheduler/file/hdf.py
(2 hunks)executorlib/task_scheduler/file/shared.py
(3 hunks)executorlib/task_scheduler/interactive/shared.py
(7 hunks)tests/test_cache_fileexecutor_serial.py
(1 hunks)tests/test_singlenodeexecutor_cache.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- tests/test_cache_fileexecutor_serial.py
- executorlib/task_scheduler/file/shared.py
- executorlib/task_scheduler/interactive/shared.py
🧰 Additional context used
🧬 Code Graph Analysis (1)
tests/test_singlenodeexecutor_cache.py (3)
executorlib/executor/single.py (1)
SingleNodeExecutor
(20-184)executorlib/executor/base.py (1)
submit
(53-87)executorlib/standalone/cache.py (1)
get_cache_data
(33-56)
🪛 Pylint (3.3.7)
tests/test_singlenodeexecutor_cache.py
[refactor] 47-47: Consider using a generator instead 'sum(c['output'] for c in cache_lst)'
(R1728)
[refactor] 49-49: Consider using a generator instead 'sum(sum(c['input_args'][0]) for c in cache_lst)'
(R1728)
for file_name in get_cache_files(cache_directory=cache_directory): | ||
with h5py.File(file_name, "r") as hdf: | ||
file_content_dict = { | ||
key: cloudpickle.loads(np.void(hdf["/" + key])) | ||
for key in group_dict.values() | ||
if key in hdf | ||
} | ||
file_content_dict["filename"] = file_name | ||
file_lst.append(file_content_dict) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
I/O errors during cache scan bubble up unhandled
While iterating over the returned file list, any corrupted / concurrently-written HDF5 file will raise inside h5py.File(…)
, aborting the whole scan.
Depending on the executor’s tolerance for partial cache availability you may want to continue past unreadable files:
for file_name in get_cache_files(cache_directory):
try:
with h5py.File(file_name, "r") as hdf:
...
except (OSError, IOError) as exc:
# log.debug("Skipping unreadable cache file %s: %s", file_name, exc)
continue
🤖 Prompt for AI Agents
In executorlib/standalone/cache.py around lines 47 to 55, the code currently
opens HDF5 files without handling I/O errors, causing the entire cache scan to
abort if a file is corrupted or concurrently written. To fix this, wrap the
h5py.File opening and reading logic inside a try-except block that catches
OSError and IOError exceptions. On catching these exceptions, log a debug
message indicating the unreadable file and continue to the next file, allowing
partial cache availability without aborting the scan.
Summary by CodeRabbit
Summary by CodeRabbit