-
Notifications
You must be signed in to change notification settings - Fork 5
💡 Implement automatic stream implementation selection #102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
💡 Implement automatic stream implementation selection #102
Conversation
…elism recommendation
Reviewer's GuideImplements dynamic switching between sequential and parallel stream implementations by adding automatic selection logic in StreamConverter, extending BaseStream with recommendation and explicit-control methods, integrating this into terminal execution, and strengthening generator support and test coverage. File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @garlontas - I've reviewed your changes and they look great!
Here's what I looked at during the review
- 🟡 General issues: 2 issues found
- 🟡 Testing: 1 issue found
- 🟢 Documentation: all looks good
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
if isinstance(self._source, Sized): | ||
for item in self._queue.get_queue(): | ||
if item.has_name(self._filter) and len(self._source) > 3000: | ||
return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Extract the hardcoded 3000 threshold into a constant or config
Defining a named constant or making this value configurable will improve code clarity and maintainability.
if isinstance(self._source, Sized): | |
for item in self._queue.get_queue(): | |
if item.has_name(self._filter) and len(self._source) > 3000: | |
return True | |
PARALLELISM_RECOMMENDATION_THRESHOLD = 3000 | |
if isinstance(self._source, Sized): | |
for item in self._queue.get_queue(): | |
if item.has_name(self._filter) and len(self._source) > PARALLELISM_RECOMMENDATION_THRESHOLD: | |
return True |
def test_parallelization_recommended(self): | ||
stream = Stream.of(range(4000)).filter(lambda x: x % 2 == 0) | ||
self.assertTrue(stream._is_parallelism_recommended()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Test _is_parallelism_recommended
behavior with a non-Sized
stream source.
Please add a test using a generator as the source to confirm that _is_parallelism_recommended
returns False
when the source is not Sized
.
def test_parallelization_recommended(self): | |
stream = Stream.of(range(4000)).filter(lambda x: x % 2 == 0) | |
self.assertTrue(stream._is_parallelism_recommended()) | |
def test_parallelization_recommended(self): | |
stream = Stream.of(range(4000)).filter(lambda x: x % 2 == 0) | |
self.assertTrue(stream._is_parallelism_recommended()) | |
def test_parallelization_not_recommended_with_generator(self): | |
def gen(): | |
for i in range(4000): | |
yield i | |
stream = Stream.of(gen()).filter(lambda x: x % 2 == 0) | |
self.assertFalse(stream._is_parallelism_recommended()) |
pystreamapi/__stream_converter.py
Outdated
if not stream._implementation_explicit: | ||
if stream._is_parallelism_recommended(): | ||
return StreamConverter.to_parallel_stream(stream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (code-quality): Merge nested if conditions (merge-nested-ifs
)
if not stream._implementation_explicit: | |
if stream._is_parallelism_recommended(): | |
return StreamConverter.to_parallel_stream(stream) | |
if not stream._implementation_explicit and stream._is_parallelism_recommended(): | |
return StreamConverter.to_parallel_stream(stream) | |
Explanation
Too much nesting can make code difficult to understand, and this is especiallytrue in Python, where there are no brackets to help out with the delineation of
different nesting levels.
Reading deeply nested code is confusing, since you have to keep track of which
conditions relate to which levels. We therefore strive to reduce nesting where
possible, and the situation where two if
conditions can be combined using
and
is an easy win.
|
This pull request introduces enhancements to the
pystreamapi
library, focusing on stream implementation flexibility, parallelization recommendations, and improved testing coverage. The key changes include adding functionality for automatic stream implementation selection, refining parallelism recommendations, and ensuring robust handling of generators.Stream Implementation Enhancements:
choose_implementation
method toStreamConverter
for automatic selection between sequential and parallel stream implementations based on parallelism recommendations. (pystreamapi/__stream_converter.py
)_is_parallelism_recommended
and_set_implementation_explicit
methods inBaseStream
to determine and control stream implementation switching. (pystreamapi/_streams/__base_stream.py
)Parallelization Logic:
terminal
decorator to integrate automatic implementation selection during stream operations. (pystreamapi/_streams/__base_stream.py
)tests/_streams/test_base_stream.py
)Generator Handling:
tests/_streams/test_stream_implementation.py
) [1] [2]Testing Enhancements:
choose_implementation
method, including integration tests with stream operations and numeric streams. (tests/_streams/test_stream_converter.py
)tests/_streams/test_base_stream.py
,tests/_streams/test_stream_converter.py
) [1] [2]Summary by Sourcery
Introduce automatic selection between sequential and parallel stream implementations based on runtime recommendations, refine parallelism logic, improve generator compatibility, and expand test coverage.
New Features:
Enhancements:
Tests: