-
Notifications
You must be signed in to change notification settings - Fork 167
Adds FutureAdapter that delegates executions to a threadpool for parallelization #1264
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Taking inspiration from #1263, I implemented a similar adapter to how async works. We get away with this because we don't encounter SERDE boundaries. If you run the example DAG you'll see that: 1. it is parallelized as it should be 2. you can use caching and the tracking adapter Rough edges: - haven't tested this extensively, but seems to just work. - need to add tests for it & docs, etc.
elijahbenizzy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mark as experimental IMO
| :return: the result of the execution of the graph. | ||
| """ | ||
| for k, v in outputs.items(): | ||
| if isinstance(v, Future): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you want concurrent.futures.as_completed(...). Can't convince myselfe that we're not deadlocking in certain cases, but I think the fact that we're doing topological order should be good enough...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this matters here. Since this wont block anything executing in a thread...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's IMO slightly cleaner, but yeah, it'll not return until the slowest does regardless. Might also not want to mutate the outputs dictionary (copying is cleaner). But yes, nits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Looks good to me! Reviewed everything up to 037c915 in 1 minute and 12 seconds
More details
- Looked at
501lines of code in9files - Skipped
2files when reviewing. - Skipped posting
7drafted comments based on config settings.
1. tests/plugins/test_h_threadpool.py:5
- Draft comment:
Consider adding tests for edge cases and error handling, such as: - Submitting a function that raises an exception.
- Handling of non-Future objects in
build_result. - Using
FutureAdapterwith nomax_workersspecified. - Reason this comment was not posted:
Confidence changes required:50%
The test file is missing tests for edge cases and error handling.
2. examples/parallelism/lazy_threadpool_execution/run.py:11
- Draft comment:
Typo in filename: "my_funtions.png" should be "my_functions.png". - Reason this comment was not posted:
Decided after close inspection that this draft comment was likely wrong and/or not actionable:
While this is a real typo, it's a very minor issue in an example file. The typo would be immediately obvious when the code runs since the generated file would have a misspelled name. It's not a logic error or code quality issue. According to the rules, we shouldn't make comments about obvious or unimportant issues.
The typo could potentially cause confusion for users following the example, and inconsistent filenames across examples might be worth fixing for documentation purposes.
While consistency is good, this is still a very minor cosmetic issue that would be immediately apparent when running the code, and fixing it isn't critical to the code's functionality.
The comment should be deleted as it points out an obvious and unimportant issue that doesn't affect code functionality or quality.
3. examples/parallelism/lazy_threadpool_execution/run.py:20
- Draft comment:
Consider makingproject_idandusernameconfigurable or document that these need to be changed by the user. - Reason this comment was not posted:
Confidence changes required:50%
Therun.pyscript uses a hardcoded project ID and username, which might not be suitable for all users.
4. examples/parallelism/lazy_threadpool_execution/run.py:17
- Draft comment:
Ensure thathamilton_sdkis a required dependency or provide instructions for its installation. - Reason this comment was not posted:
Confidence changes required:50%
Therun.pyscript importsadaptersfromhamilton_sdk, which might not be available in all environments.
5. hamilton/plugins/h_threadpool.py:71
- Draft comment:
Consider adding exception handling for the function execution to prevent unhandled exceptions from crashing the thread pool. - Reason this comment was not posted:
Decided after close inspection that this draft comment was likely wrong and/or not actionable:
ThreadPoolExecutor automatically captures exceptions in Future objects, which can be retrieved via Future.exception(). The code already uses Future.result() which will raise any captured exceptions. Adding additional exception handling would be redundant since the executor already provides this functionality. The comment seems to misunderstand how ThreadPoolExecutor works.
Maybe there are specific types of exceptions that should be handled differently or logged in a particular way for this application.
While custom exception handling could be added, it should be driven by specific requirements rather than general concerns about unhandled exceptions, which are already managed by ThreadPoolExecutor.
The comment should be deleted as it suggests adding unnecessary exception handling that's already provided by ThreadPoolExecutor.
6. examples/parallelism/lazy_threadpool_execution/my_functions.py:4
- Draft comment:
Function names likea,b,c, etc., are not descriptive. Consider using more descriptive names to improve code readability. This applies to all similar functions in this file and inmy_functions_async.py. - Reason this comment was not posted:
Decided after close inspection that this draft comment was likely wrong and/or not actionable:
Since this is in an examples/parallelism directory, these functions appear to be deliberately simplified to demonstrate a concept about parallel execution. The single-letter names might actually be beneficial here to keep the example focused on the parallelism patterns rather than domain-specific logic. The file name and directory structure suggest this is educational/example code rather than production code.
The comment raises a valid point about code readability in general, but I might be overlooking the pedagogical value of keeping example code extremely simple.
In example code demonstrating technical concepts like parallelism, overly descriptive names could actually detract from the learning objective by adding unnecessary complexity.
Delete the comment. While descriptive names are generally good practice, in this specific context of example code demonstrating parallelism concepts, the simple single-letter names serve a valid pedagogical purpose.
7. examples/parallelism/lazy_threadpool_execution/run.py:20
- Draft comment:
Avoid hardcoding sensitive information likeproject_idandusername. Consider using environment variables or configuration files to manage such data securely. This applies to all similar instances in this file. - Reason this comment was not posted:
Decided after close inspection that this draft comment was likely wrong and/or not actionable:
While hardcoded credentials are generally a security concern, this is in an examples directory and appears to be demo code. The project_id is just "21" and has a comment "modify this as needed". The email address appears to be a real email but is likely just for demonstration. Example code often includes placeholder values to show the expected format.
The comment raises a valid security best practice. Hardcoded credentials could be copied by users into production code.
Since this is clearly example code and the values are meant to be replaced, adding configuration management would make the example more complex and harder to understand.
Delete the comment. While the security practice is valid, this is example code where simple placeholder values are more appropriate for demonstration purposes.
Workflow ID: wflow_jgnKlIGIxZMkXrlc
You can customize Ellipsis with 👍 / 👎 feedback, review rules, user-specific overrides, quiet mode, and more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Looks good to me! Incremental review on cecc050 in 24 seconds
More details
- Looked at
30lines of code in2files - Skipped
0files when reviewing. - Skipped posting
1drafted comments based on config settings.
1. examples/parallelism/lazy_threadpool_execution/run.py:11
- Draft comment:
There's a typo in the filename. It should bemy_functions.pnginstead ofmy_funtions.png. - Reason this comment was not posted:
Comment looked like it was already resolved.
Workflow ID: wflow_7kOHRbqPlOa0bhhM
You can customize Ellipsis with 👍 / 👎 feedback, review rules, user-specific overrides, quiet mode, and more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Looks good to me! Incremental review on 4ac52f1 in 4 seconds
More details
- Looked at
9lines of code in1files - Skipped
0files when reviewing. - Skipped posting
2drafted comments based on config settings.
1. examples/parallelism/lazy_threadpool_execution/requirements.txt:1
- Draft comment:
It's recommended to specify exact versions for dependencies to ensure reproducibility. - Reason this comment was not posted:
Confidence changes required:50%
The requirements file should specify exact versions to ensure reproducibility.
2. examples/parallelism/lazy_threadpool_execution/requirements.txt:3
- Draft comment:
Remove the empty line at the end of the file for consistency. - Reason this comment was not posted:
Confidence changes required:10%
The requirements file should not have an empty line at the end. This is a minor issue but should be corrected for consistency.
Workflow ID: wflow_TnxeLrcDmtuh7gwY
You can customize Ellipsis with 👍 / 👎 feedback, review rules, user-specific overrides, quiet mode, and more.
So that people can adjust the result accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Looks good to me! Incremental review on 4f136d4 in 45 seconds
More details
- Looked at
133lines of code in2files - Skipped
0files when reviewing. - Skipped posting
6drafted comments based on config settings.
1. tests/plugins/test_h_threadpool.py:4
- Draft comment:
The importfrom hamilton import lifecycleis unused and can be removed to clean up the code. - Reason this comment was not posted:
Confidence changes required:10%
The import statement forlifecycleis not used in the test file. Removing it will clean up the code.
2. hamilton/plugins/h_threadpool.py:72
- Draft comment:
Returning[Any]frominput_typesis not very informative. Consider returning an empty list or a more specific type if applicable. - Reason this comment was not posted:
Confidence changes required:50%
Theinput_typesmethod inFutureAdapterreturns a list withAny, which is not very informative. It might be better to return an empty list or a more specific type if possible.
3. hamilton/plugins/h_threadpool.py:80
- Draft comment:
ReturningAnyfromoutput_typeis not very informative. Consider returning a more specific type if applicable. - Reason this comment was not posted:
Confidence changes required:50%
Theoutput_typemethod inFutureAdapterreturnsAnywhenresult_builderis not provided. It might be better to return a more specific type if possible.
4. hamilton/plugins/h_threadpool.py:82
- Draft comment:
Thedo_remote_executemethod inFutureAdapteris not following the single responsibility principle as it both submits a function to the executor and wraps it with_new_fn. Consider splitting this into two methods for clarity. - Reason this comment was not posted:
Comment was on unchanged code.
5. hamilton/plugins/h_threadpool.py:48
- Draft comment:
TheFutureAdapterclass is handling two responsibilities: managing thread execution and building results. Consider separating these concerns to adhere to the Single Responsibility Principle. - Reason this comment was not posted:
Comment was not on a valid diff hunk.
6. hamilton/plugins/h_threadpool.py:64
- Draft comment:
The methodsinput_typesandoutput_typeinFutureAdaptercould have more descriptive names to better convey their purpose. - Reason this comment was not posted:
Decided after close inspection that this draft comment was likely wrong and/or not actionable:
- These method names are part of an interface (ResultBuilder) implementation. 2. The names are clear and descriptive for their purpose. 3. Changing interface method names would break compatibility. 4. The methods have good docstrings explaining their purpose. 5. The names follow standard Python type annotation patterns.
The names could potentially be more specific about their role in the FutureAdapter context. However, they're interface methods.
Interface method names should be consistent across implementations, and these names are already clear and well-documented.
The comment should be deleted as it suggests renaming interface methods that are already well-named and documented.
Workflow ID: wflow_PfXblcHcDhyI1ynw
You can customize Ellipsis with 👍 / 👎 feedback, review rules, user-specific overrides, quiet mode, and more.
Taking inspiration from #1263, I implemented
a similar adapter to how async works.
We get away with this because we don't encounter
SERDE boundaries.
If you run the example DAG you'll see that:
Rough edges:
Changes
How I tested this
See this image from the tracker showing that things are running in parallel:

Notes - TODO:
- [ ] example uses materializers & caching to exercise things.Checklist