1.x: enable operator/source fusion by named operator lifter #3506
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This change factors out the body of
lift()
into a named class that gives access to the operator and source parameters. By using this information, other operators can perform what I call operator macro-fusion.One such example with this PR is the repeated use of the operator
mergeWith
which when done in the classical way creates a long linked-list of sources merged in pairs, often leading to stack overflowsand degraded performance. However, if
mergeWith
can see that it is applied to an existing mergeWith, the two operators can use a common list of sources and then turn into a one-level merge() with n + 1sources (the previous graph will then be GC'd). Don't worry, this doesn't destroy the original assembled sequence. For example, given
c = a.mergeWith(b); d = c.mergeWith(e);
both c and d can be freely subscribed to and still do the same thing.Note also that this PR conflicts with PR #3477 since the array-based
merge(from(os))
has a different type.I didn't officially benchmarked this due to the stackoverflow with head. Given the 10002 values merged in the unit test in 34ms (i7 4790, Windows 7 x64, Java 8u66) which yields ~294 kOps/s.