Skip to content

💡 Implement automatic stream implementation selection #102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 5, 2025

Conversation

garlontas
Copy link
Member

@garlontas garlontas commented Jun 5, 2025

This pull request introduces enhancements to the pystreamapi library, focusing on stream implementation flexibility, parallelization recommendations, and improved testing coverage. The key changes include adding functionality for automatic stream implementation selection, refining parallelism recommendations, and ensuring robust handling of generators.

Stream Implementation Enhancements:

  • Added choose_implementation method to StreamConverter for automatic selection between sequential and parallel stream implementations based on parallelism recommendations. (pystreamapi/__stream_converter.py)
  • Introduced _is_parallelism_recommended and _set_implementation_explicit methods in BaseStream to determine and control stream implementation switching. (pystreamapi/_streams/__base_stream.py)

Parallelization Logic:

  • Updated terminal decorator to integrate automatic implementation selection during stream operations. (pystreamapi/_streams/__base_stream.py)
  • Added tests to validate parallelization recommendation logic under different scenarios. (tests/_streams/test_base_stream.py)

Generator Handling:

  • Improved handling of infinite and finite generators, ensuring compatibility with sequential streams and adding tests for edge cases. (tests/_streams/test_stream_implementation.py) [1] [2]

Testing Enhancements:

  • Expanded test coverage for the choose_implementation method, including integration tests with stream operations and numeric streams. (tests/_streams/test_stream_converter.py)
  • Added pylint directives to test files for cleaner code analysis. (tests/_streams/test_base_stream.py, tests/_streams/test_stream_converter.py) [1] [2]

Summary by Sourcery

Introduce automatic selection between sequential and parallel stream implementations based on runtime recommendations, refine parallelism logic, improve generator compatibility, and expand test coverage.

New Features:

  • Add StreamConverter.choose_implementation method for dynamic switching between sequential and parallel streams based on recommendation logic.

Enhancements:

  • Implement BaseStream._is_parallelism_recommended to trigger parallelization for large, filtered collections and add _implementation_explicit flag to prevent auto-switching.
  • Integrate automatic implementation selection into the terminal decorator to apply the chosen stream type at operation time.
  • Improve handling of infinite and finite generators by enforcing sequential processing for unsupported parallel contexts.

Tests:

  • Expand tests for choose_implementation covering recommendation scenarios, explicit overrides, integration with operations, and numeric streams.
  • Add tests for infinite and finite generator handling across both sequential and parallel streams.
  • Include pylint directives in test files for cleaner code analysis.

Copy link

sourcery-ai bot commented Jun 5, 2025

Reviewer's Guide

Implements dynamic switching between sequential and parallel stream implementations by adding automatic selection logic in StreamConverter, extending BaseStream with recommendation and explicit-control methods, integrating this into terminal execution, and strengthening generator support and test coverage.

File-Level Changes

Change Details Files
Introduce dynamic implementation selection in StreamConverter
  • Add choose_implementation method deciding between parallel and sequential
  • Invoke choose_implementation in terminal wrapper before executing operations
  • Mark streams explicit in to_parallel_stream and to_sequential_stream
pystreamapi/__stream_converter.py
pystreamapi/_streams/__base_stream.py
Extend BaseStream with parallelism recommendation and explicit control
  • Implement _is_parallelism_recommended based on source size and queued filters
  • Introduce _set_implementation_explicit and initialize _implementation_explicit flag
  • Enhance ProcessQueue with has_name for operation identification
pystreamapi/_streams/__base_stream.py
pystreamapi/_lazy/process.py
Enhance terminal decorator to trigger implementation switching
  • Import StreamConverter inside wrapper and call choose_implementation
  • Add _verify_open check before executing queued processes
pystreamapi/_streams/__base_stream.py
Improve generator handling via tests
  • Add finite_generator helper and refine throwing_generator logic
  • Adjust flat_map and handling tests to enforce sequential() on infinite generators
  • Cover finite vs infinite generator operations in limit/map/filter scenarios
tests/_streams/test_stream_implementation.py
Expand tests for choose_implementation scenarios
  • Add tests for recommended vs not recommended parallelism
  • Test explicit implementation override behavior
  • Include integration tests with filtering, numeric streams, and sum operations
tests/_streams/test_stream_converter.py
Add pylint directives to test files
  • Insert pylint disable comments at top of test_base_stream.py
  • Insert pylint disable comments at top of test_stream_converter.py
tests/_streams/test_base_stream.py
tests/_streams/test_stream_converter.py
Refactor SequentialStream to use unified terminal decorator
  • Replace @stream.terminal with imported @Terminal decorator in sequential methods
pystreamapi/_streams/__sequential_stream.py

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @garlontas - I've reviewed your changes and they look great!

Here's what I looked at during the review
  • 🟡 General issues: 2 issues found
  • 🟡 Testing: 1 issue found
  • 🟢 Documentation: all looks good

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines 95 to 98
if isinstance(self._source, Sized):
for item in self._queue.get_queue():
if item.has_name(self._filter) and len(self._source) > 3000:
return True
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Extract the hardcoded 3000 threshold into a constant or config

Defining a named constant or making this value configurable will improve code clarity and maintainability.

Suggested change
if isinstance(self._source, Sized):
for item in self._queue.get_queue():
if item.has_name(self._filter) and len(self._source) > 3000:
return True
PARALLELISM_RECOMMENDATION_THRESHOLD = 3000
if isinstance(self._source, Sized):
for item in self._queue.get_queue():
if item.has_name(self._filter) and len(self._source) > PARALLELISM_RECOMMENDATION_THRESHOLD:
return True

Comment on lines +122 to +124
def test_parallelization_recommended(self):
stream = Stream.of(range(4000)).filter(lambda x: x % 2 == 0)
self.assertTrue(stream._is_parallelism_recommended())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Test _is_parallelism_recommended behavior with a non-Sized stream source.

Please add a test using a generator as the source to confirm that _is_parallelism_recommended returns False when the source is not Sized.

Suggested change
def test_parallelization_recommended(self):
stream = Stream.of(range(4000)).filter(lambda x: x % 2 == 0)
self.assertTrue(stream._is_parallelism_recommended())
def test_parallelization_recommended(self):
stream = Stream.of(range(4000)).filter(lambda x: x % 2 == 0)
self.assertTrue(stream._is_parallelism_recommended())
def test_parallelization_not_recommended_with_generator(self):
def gen():
for i in range(4000):
yield i
stream = Stream.of(gen()).filter(lambda x: x % 2 == 0)
self.assertFalse(stream._is_parallelism_recommended())

Comment on lines 49 to 51
if not stream._implementation_explicit:
if stream._is_parallelism_recommended():
return StreamConverter.to_parallel_stream(stream)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): Merge nested if conditions (merge-nested-ifs)

Suggested change
if not stream._implementation_explicit:
if stream._is_parallelism_recommended():
return StreamConverter.to_parallel_stream(stream)
if not stream._implementation_explicit and stream._is_parallelism_recommended():
return StreamConverter.to_parallel_stream(stream)


ExplanationToo much nesting can make code difficult to understand, and this is especially
true in Python, where there are no brackets to help out with the delineation of
different nesting levels.

Reading deeply nested code is confusing, since you have to keep track of which
conditions relate to which levels. We therefore strive to reduce nesting where
possible, and the situation where two if conditions can be combined using
and is an easy win.

@garlontas garlontas linked an issue Jun 5, 2025 that may be closed by this pull request
Copy link

sonarqubecloud bot commented Jun 5, 2025

@garlontas garlontas merged commit 15083ad into main Jun 5, 2025
6 checks passed
@garlontas garlontas deleted the feature/automatically-select-stream-implementation branch June 5, 2025 15:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Stream.of(...) consumes the underlying iterable eagerly
1 participant