Skip to content

Commit 409a5cf

Browse files
koldatsimonbasle
andauthored
Add mergeComparing as fail-fast alternative to mergeOrdered (#2719)
This commit adds Flux.mergeComparing/mergeComparingWith as well as a mergeComparingDelayError variant. The base operator fails fast if any of its sources errors. This supersedes mergeOrdered, which had the delayError behavior built in (making it inconsistent with other merge operators). As a result, the mergeOrdered operators are now deprecated. Fixes #2640. Co-authored-by: Simon Baslé <sbasle@vmware.com>
1 parent 2e7ec6d commit 409a5cf

File tree

11 files changed

+1283
-1063
lines changed

11 files changed

+1283
-1063
lines changed

reactor-core/src/main/java/reactor/core/publisher/Flux.java

Lines changed: 176 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1523,13 +1523,123 @@ public static <I> Flux<I> mergeDelayError(int prefetch, Publisher<? extends I>..
15231523
* Instead, this operator considers only one value from each source and picks the
15241524
* smallest of all these values, then replenishes the slot for that picked source.
15251525
* <p>
1526-
* <img class="marble" src="doc-files/marbles/mergeOrderedNaturalOrder.svg" alt="">
1526+
* <img class="marble" src="doc-files/marbles/mergeComparingNaturalOrder.svg" alt="">
15271527
*
15281528
* @param sources {@link Publisher} sources of {@link Comparable} to merge
15291529
* @param <I> a {@link Comparable} merged type that has a {@link Comparator#naturalOrder() natural order}
15301530
* @return a merged {@link Flux} that , subscribing early but keeping the original ordering
15311531
*/
15321532
@SafeVarargs
1533+
public static <I extends Comparable<? super I>> Flux<I> mergeComparing(Publisher<? extends I>... sources) {
1534+
return mergeComparing(Queues.SMALL_BUFFER_SIZE, Comparator.naturalOrder(), sources);
1535+
}
1536+
1537+
/**
1538+
* Merge data from provided {@link Publisher} sequences into an ordered merged sequence,
1539+
* by picking the smallest values from each source (as defined by the provided
1540+
* {@link Comparator}). This is not a {@link #sort(Comparator)}, as it doesn't consider
1541+
* the whole of each sequences.
1542+
* <p>
1543+
* Instead, this operator considers only one value from each source and picks the
1544+
* smallest of all these values, then replenishes the slot for that picked source.
1545+
* <p>
1546+
* <img class="marble" src="doc-files/marbles/mergeComparing.svg" alt="">
1547+
*
1548+
* @param comparator the {@link Comparator} to use to find the smallest value
1549+
* @param sources {@link Publisher} sources to merge
1550+
* @param <T> the merged type
1551+
* @return a merged {@link Flux} that compares latest values from each source, using the
1552+
* smallest value and replenishing the source that produced it
1553+
*/
1554+
@SafeVarargs
1555+
public static <T> Flux<T> mergeComparing(Comparator<? super T> comparator, Publisher<? extends T>... sources) {
1556+
return mergeComparing(Queues.SMALL_BUFFER_SIZE, comparator, sources);
1557+
}
1558+
1559+
/**
1560+
* Merge data from provided {@link Publisher} sequences into an ordered merged sequence,
1561+
* by picking the smallest values from each source (as defined by the provided
1562+
* {@link Comparator}). This is not a {@link #sort(Comparator)}, as it doesn't consider
1563+
* the whole of each sequences.
1564+
* <p>
1565+
* Instead, this operator considers only one value from each source and picks the
1566+
* smallest of all these values, then replenishes the slot for that picked source.
1567+
* <p>
1568+
* <img class="marble" src="doc-files/marbles/mergeComparing.svg" alt="">
1569+
*
1570+
* @param prefetch the number of elements to prefetch from each source (avoiding too
1571+
* many small requests to the source when picking)
1572+
* @param comparator the {@link Comparator} to use to find the smallest value
1573+
* @param sources {@link Publisher} sources to merge
1574+
* @param <T> the merged type
1575+
* @return a merged {@link Flux} that compares latest values from each source, using the
1576+
* smallest value and replenishing the source that produced it
1577+
*/
1578+
@SafeVarargs
1579+
public static <T> Flux<T> mergeComparing(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources) {
1580+
if (sources.length == 0) {
1581+
return empty();
1582+
}
1583+
if (sources.length == 1) {
1584+
return from(sources[0]);
1585+
}
1586+
return onAssembly(new FluxMergeComparing<>(prefetch, comparator, false, sources));
1587+
}
1588+
1589+
/**
1590+
* Merge data from provided {@link Publisher} sequences into an ordered merged sequence,
1591+
* by picking the smallest values from each source (as defined by the provided
1592+
* {@link Comparator}). This is not a {@link #sort(Comparator)}, as it doesn't consider
1593+
* the whole of each sequences.
1594+
* <p>
1595+
* Instead, this operator considers only one value from each source and picks the
1596+
* smallest of all these values, then replenishes the slot for that picked source.
1597+
* <p>
1598+
* Note that it is delaying errors until all data is consumed.
1599+
* <p>
1600+
* <img class="marble" src="doc-files/marbles/mergeComparing.svg" alt="">
1601+
*
1602+
* @param prefetch the number of elements to prefetch from each source (avoiding too
1603+
* many small requests to the source when picking)
1604+
* @param comparator the {@link Comparator} to use to find the smallest value
1605+
* @param sources {@link Publisher} sources to merge
1606+
* @param <T> the merged type
1607+
* @return a merged {@link Flux} that compares latest values from each source, using the
1608+
* smallest value and replenishing the source that produced it
1609+
*/
1610+
@SafeVarargs
1611+
public static <T> Flux<T> mergeComparingDelayError(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources) {
1612+
if (sources.length == 0) {
1613+
return empty();
1614+
}
1615+
if (sources.length == 1) {
1616+
return from(sources[0]);
1617+
}
1618+
return onAssembly(new FluxMergeComparing<>(prefetch, comparator, true, sources));
1619+
}
1620+
1621+
/**
1622+
* Merge data from provided {@link Publisher} sequences into an ordered merged sequence,
1623+
* by picking the smallest values from each source (as defined by their natural order).
1624+
* This is not a {@link #sort()}, as it doesn't consider the whole of each sequences.
1625+
* <p>
1626+
* Instead, this operator considers only one value from each source and picks the
1627+
* smallest of all these values, then replenishes the slot for that picked source.
1628+
* <p>
1629+
* Note that it is delaying errors until all data is consumed.
1630+
* <p>
1631+
* <img class="marble" src="doc-files/marbles/mergeComparingNaturalOrder.svg" alt="">
1632+
*
1633+
* @param sources {@link Publisher} sources of {@link Comparable} to merge
1634+
* @param <I> a {@link Comparable} merged type that has a {@link Comparator#naturalOrder() natural order}
1635+
* @return a merged {@link Flux} that compares latest values from each source, using the
1636+
* smallest value and replenishing the source that produced it
1637+
* @deprecated Use {@link #mergeComparingDelayError(int, Comparator, Publisher[])} instead
1638+
* (as {@link #mergeComparing(Publisher[])} don't have this operator's delayError behavior).
1639+
* To be removed in 3.6.0 at the earliest.
1640+
*/
1641+
@SafeVarargs
1642+
@Deprecated
15331643
public static <I extends Comparable<? super I>> Flux<I> mergeOrdered(Publisher<? extends I>... sources) {
15341644
return mergeOrdered(Queues.SMALL_BUFFER_SIZE, Comparator.naturalOrder(), sources);
15351645
}
@@ -1543,14 +1653,21 @@ public static <I extends Comparable<? super I>> Flux<I> mergeOrdered(Publisher<?
15431653
* Instead, this operator considers only one value from each source and picks the
15441654
* smallest of all these values, then replenishes the slot for that picked source.
15451655
* <p>
1546-
* <img class="marble" src="doc-files/marbles/mergeOrdered.svg" alt="">
1656+
* Note that it is delaying errors until all data is consumed.
1657+
* <p>
1658+
* <img class="marble" src="doc-files/marbles/mergeComparing.svg" alt="">
15471659
*
15481660
* @param comparator the {@link Comparator} to use to find the smallest value
15491661
* @param sources {@link Publisher} sources to merge
15501662
* @param <T> the merged type
1551-
* @return a merged {@link Flux} that , subscribing early but keeping the original ordering
1663+
* @return a merged {@link Flux} that compares latest values from each source, using the
1664+
* smallest value and replenishing the source that produced it
1665+
* @deprecated Use {@link #mergeComparingDelayError(int, Comparator, Publisher[])} instead
1666+
* (as {@link #mergeComparing(Publisher[])} don't have this operator's delayError behavior).
1667+
* To be removed in 3.6.0 at the earliest.
15521668
*/
15531669
@SafeVarargs
1670+
@Deprecated
15541671
public static <T> Flux<T> mergeOrdered(Comparator<? super T> comparator, Publisher<? extends T>... sources) {
15551672
return mergeOrdered(Queues.SMALL_BUFFER_SIZE, comparator, sources);
15561673
}
@@ -1564,24 +1681,31 @@ public static <T> Flux<T> mergeOrdered(Comparator<? super T> comparator, Publish
15641681
* Instead, this operator considers only one value from each source and picks the
15651682
* smallest of all these values, then replenishes the slot for that picked source.
15661683
* <p>
1567-
* <img class="marble" src="doc-files/marbles/mergeOrdered.svg" alt="">
1684+
* Note that it is delaying errors until all data is consumed.
1685+
* <p>
1686+
* <img class="marble" src="doc-files/marbles/mergeComparing.svg" alt="">
15681687
*
15691688
* @param prefetch the number of elements to prefetch from each source (avoiding too
15701689
* many small requests to the source when picking)
15711690
* @param comparator the {@link Comparator} to use to find the smallest value
15721691
* @param sources {@link Publisher} sources to merge
15731692
* @param <T> the merged type
1574-
* @return a merged {@link Flux} that , subscribing early but keeping the original ordering
1693+
* @return a merged {@link Flux} that compares latest values from each source, using the
1694+
* smallest value and replenishing the source that produced it
1695+
* @deprecated Use {@link #mergeComparingDelayError(int, Comparator, Publisher[])} instead
1696+
* (as {@link #mergeComparing(Publisher[])} don't have this operator's delayError behavior).
1697+
* To be removed in 3.6.0 at the earliest.
15751698
*/
15761699
@SafeVarargs
1700+
@Deprecated
15771701
public static <T> Flux<T> mergeOrdered(int prefetch, Comparator<? super T> comparator, Publisher<? extends T>... sources) {
15781702
if (sources.length == 0) {
15791703
return empty();
15801704
}
15811705
if (sources.length == 1) {
15821706
return from(sources[0]);
15831707
}
1584-
return onAssembly(new FluxMergeOrdered<>(prefetch, comparator, sources));
1708+
return onAssembly(new FluxMergeComparing<>(prefetch, comparator, true, sources));
15851709
}
15861710

15871711
/**
@@ -6154,22 +6278,64 @@ public final Flux<Signal<T>> materialize() {
61546278
* in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to
61556279
* another source.
61566280
* <p>
6157-
* <img class="marble" src="doc-files/marbles/mergeOrderedWith.svg" alt="">
6281+
* Note that it is delaying errors until all data is consumed.
6282+
* <p>
6283+
* <img class="marble" src="doc-files/marbles/mergeComparingWith.svg" alt="">
61586284
*
61596285
* @param other the {@link Publisher} to merge with
61606286
* @param otherComparator the {@link Comparator} to use for merging
61616287
*
6162-
* @return a new {@link Flux}
6288+
* @return a new {@link Flux} that compares latest values from the given publisher
6289+
* and this flux, using the smallest value and replenishing the source that produced it
6290+
* @deprecated Use {@link #mergeComparingWith(Publisher, Comparator)} instead
6291+
* (with the caveat that it defaults to NOT delaying errors, unlike this operator).
6292+
* To be removed in 3.6.0 at the earliest.
61636293
*/
6294+
@Deprecated
61646295
public final Flux<T> mergeOrderedWith(Publisher<? extends T> other,
61656296
Comparator<? super T> otherComparator) {
6166-
if (this instanceof FluxMergeOrdered) {
6167-
FluxMergeOrdered<T> fluxMerge = (FluxMergeOrdered<T>) this;
6297+
if (this instanceof FluxMergeComparing) {
6298+
FluxMergeComparing<T> fluxMerge = (FluxMergeComparing<T>) this;
61686299
return fluxMerge.mergeAdditionalSource(other, otherComparator);
61696300
}
61706301
return mergeOrdered(otherComparator, this, other);
61716302
}
61726303

6304+
/**
6305+
* Merge data from this {@link Flux} and a {@link Publisher} into a reordered merge
6306+
* sequence, by picking the smallest value from each sequence as defined by a provided
6307+
* {@link Comparator}. Note that subsequent calls are combined, and their comparators are
6308+
* in lexicographic order as defined by {@link Comparator#thenComparing(Comparator)}.
6309+
* <p>
6310+
* The combination step is avoided if the two {@link Comparator Comparators} are
6311+
* {@link Comparator#equals(Object) equal} (which can easily be achieved by using the
6312+
* same reference, and is also always true of {@link Comparator#naturalOrder()}).
6313+
* <p>
6314+
* Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with
6315+
* an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source
6316+
* in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to
6317+
* another source.
6318+
* <p>
6319+
* <img class="marble" src="doc-files/marbles/mergeComparingWith.svg" alt="">
6320+
* <p>
6321+
* mergeComparingWith doesn't delay errors by default, but it will inherit the delayError
6322+
* behavior of a mergeComparingDelayError directly above it.
6323+
*
6324+
* @param other the {@link Publisher} to merge with
6325+
* @param otherComparator the {@link Comparator} to use for merging
6326+
*
6327+
* @return a new {@link Flux} that compares latest values from the given publisher
6328+
* and this flux, using the smallest value and replenishing the source that produced it
6329+
*/
6330+
public final Flux<T> mergeComparingWith(Publisher<? extends T> other,
6331+
Comparator<? super T> otherComparator) {
6332+
if (this instanceof FluxMergeComparing) {
6333+
FluxMergeComparing<T> fluxMerge = (FluxMergeComparing<T>) this;
6334+
return fluxMerge.mergeAdditionalSource(other, otherComparator);
6335+
}
6336+
return mergeComparing(otherComparator, this, other);
6337+
}
6338+
61736339
/**
61746340
* Merge data from this {@link Flux} and a {@link Publisher} into an interleaved merged
61756341
* sequence. Unlike {@link #concatWith(Publisher) concat}, inner sources are subscribed

0 commit comments

Comments
 (0)