Skip to content

[Streams] Allow GroupBy with infinite output substreams #7607

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

Arkatufus
Copy link
Contributor

@Arkatufus Arkatufus commented Apr 28, 2025

Fixes #7514

Changes

  • Allows GroupBy to have infinite substreams by passing negative values to maxSubstreams argument.
  • Add new public Flow and Source fluent API that does not require the maxSubstreams argument.

Checklist

For significant changes, please ensure that the following have been completed (delete if not relevant):

Copy link
Contributor Author

@Arkatufus Arkatufus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Self-review

@@ -419,9 +419,9 @@ public void OnPush()
}
else
{
if (_activeSubstreams.Count + _closedSubstreams.Count == _stage._maxSubstreams)
if (_stage._maxSubstreams > -1 && _activeSubstreams.Count + _closedSubstreams.Count == _stage._maxSubstreams)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actual fix. Only check substream overflow condition if _maxSubstream is greater than or equals to 0.

/// </para>
/// See <seealso cref="GroupBy{TIn, TOut, TMat, TKey}(Flow{TIn, TOut, TMat}, int, Func{TOut, TKey}, bool)"/>
/// </summary>
/// <typeparam name="TIn">TBD</typeparam>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Arkatufus just a general note going forward, just don't have a comment instead of a TBD.

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Aaronontheweb Aaronontheweb merged commit 2eb59c5 into akkadotnet:dev Apr 28, 2025
11 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Akka.Streams: GroupBy shouldn't require users to pre-calculate max number of groups
2 participants