Tip
|
In this section, if an operator is specific to Flux or Mono , it is
prefixed accordingly. Common operators have no prefix. When a specific use case
is covered by a combination of operators, it is presented as a method call, with
leading dot and parameters in parentheses, as follows: .methodCall(parameter) .
|
I want to deal with:
-
that emits a
T
, and I already have:just
-
…from an
Optional<T>
:Mono#justOrEmpty(Optional<T>)
-
…from a potentially
null
T:Mono#justOrEmpty(T)
-
-
that emits a
T
returned by a method:just
as well-
…but lazily captured: use
Mono#fromSupplier
or wrapjust
insidedefer
-
-
that emits several
T
I can explicitly enumerate:Flux#just(T…)
-
that iterates over:
-
an array:
Flux#fromArray
-
a collection or iterable:
Flux#fromIterable
-
a range of integers:
Flux#range
-
a
Stream
supplied for each Subscription:Flux#fromStream(Supplier<Stream>)
-
-
that emits from various single-valued sources such as:
-
a
Supplier<T>
:Mono#fromSupplier
-
a task:
Mono#fromCallable
,Mono#fromRunnable
-
a
CompletableFuture<T>
:Mono#fromFuture
-
-
that completes:
empty
-
that errors immediately:
error
-
…but lazily build the
Throwable
:error(Supplier<Throwable>)
-
-
that never does anything:
never
-
that is decided at subscription:
defer
-
that depends on a disposable resource:
using
-
that generates events programmatically (can use state):
-
synchronously and one-by-one:
Flux#generate
-
asynchronously (can also be sync), multiple emissions possible in one pass:
Flux#create
(Mono#create
as well, without the multiple emission aspect)
-
-
I want to transform existing data:
-
on a 1-to-1 basis (eg. strings to their length):
map
-
…by just casting it:
cast
-
…in order to materialize each source value’s index:
Flux#index
-
-
on a 1-to-n basis (eg. strings to their characters):
flatMap
+ use a factory method -
on a 1-to-n basis with programmatic behavior for each source element and/or state:
handle
-
running an asynchronous task for each source item (eg. urls to http request):
flatMap
+ an asyncPublisher
-returning method-
…ignoring some data: conditionally return a
Mono.empty()
in the flatMap lambda -
…retaining the original sequence order:
Flux#flatMapSequential
(this triggers the async processes immediately but reorders the results) -
…where the async task can return multiple values, from a
Mono
source:Mono#flatMapMany
-
-
-
I want to add pre-set elements to an existing sequence:
-
at the start:
Flux#startWith(T…)
-
at the end:
Flux#concatWith(T…)
-
-
I want to aggregate a
Flux
: (theFlux#
prefix is assumed below)-
into a List:
collectList
,collectSortedList
-
into a Map:
collectMap
,collectMultiMap
-
into an arbitrary container:
collect
-
into the size of the sequence:
count
-
by applying a function between each element (eg. running sum):
reduce
-
…but emitting each intermediary value:
scan
-
-
into a boolean value from a predicate:
-
applied to all values (AND):
all
-
applied to at least one value (OR):
any
-
testing the presence of any value:
hasElements
-
testing the presence of a specific value:
hasElement
-
-
-
I want to combine publishers…
-
in sequential order:
Flux#concat
or.concatWith(other)
-
…but delaying any error until remaining publishers have been emitted:
Flux#concatDelayError
-
…but eagerly subscribing to subsequent publishers:
Flux#mergeSequential
-
-
in emission order (combined items emitted as they come):
Flux#merge
/.mergeWith(other)
-
…with different types (transforming merge):
Flux#zip
/Flux#zipWith
-
-
by pairing values:
-
from 2 Monos into a
Tuple2
:Mono#zipWith
-
from n Monos when they all completed:
Mono#zip
-
-
by coordinating their termination:
-
from 1 Mono and any source into a
Mono<Void>
:Mono#and
-
from n sources when they all completed:
Mono#when
-
into an arbitrary container type:
-
each time all sides have emitted:
Flux#zip
(up to the smallest cardinality) -
each time a new value arrives at either side:
Flux#combineLatest
-
-
-
selecting the first publisher which…
-
produces a value (
onNext
):firstWithValue
-
produces any signal:
firstWithSignal
-
-
triggered by the elements in a source sequence:
switchMap
(each source element is mapped to a Publisher) -
triggered by the start of the next publisher in a sequence of publishers:
switchOnNext
-
-
I want to repeat an existing sequence:
repeat
-
…but at time intervals:
Flux.interval(duration).flatMap(tick → myExistingPublisher)
-
-
I have an empty sequence but…
-
I want a value instead:
defaultIfEmpty
-
I want another sequence instead:
switchIfEmpty
-
-
I have a sequence but I am not interested in values:
ignoreElements
-
…and I want the completion represented as a
Mono
:then
-
…and I want to wait for another task to complete at the end:
thenEmpty
-
…and I want to switch to another
Mono
at the end:Mono#then(mono)
-
…and I want to emit a single value at the end:
Mono#thenReturn(T)
-
…and I want to switch to a
Flux
at the end:thenMany
-
-
I have a Mono for which I want to defer completion…
-
…until another publisher, which is derived from this value, has completed:
Mono#delayUntil(Function)
-
-
I want to expand elements recursively into a graph of sequences and emit the combination…
-
…expanding the graph breadth first:
expand(Function)
-
…expanding the graph depth first:
expandDeep(Function)
-
-
Without modifying the final sequence, I want to:
-
get notified of / execute additional behavior (sometimes referred to as "side-effects") on:
-
emissions:
doOnNext
-
completion:
Flux#doOnComplete
,Mono#doOnSuccess
(includes the result, if any) -
error termination:
doOnError
-
cancellation:
doOnCancel
-
"start" of the sequence:
doFirst
-
this is tied to
Publisher#subscribe(Subscriber)
-
-
post-subscription :
doOnSubscribe
-
as in
Subscription
acknowledgment aftersubscribe
-
this is tied to
Subscriber#onSubscribe(Subscription)
-
-
request:
doOnRequest
-
completion or error:
doOnTerminate
(Mono version includes the result, if any)-
but after it has been propagated downstream:
doAfterTerminate
-
-
any type of signal, represented as a
Signal
:Flux#doOnEach
-
any terminating condition (complete, error, cancel):
doFinally
-
-
log what happens internally:
log
-
-
I want to know of all events:
-
each represented as
Signal
object:-
in a callback outside the sequence:
doOnEach
-
instead of the original onNext emissions:
materialize
-
…and get back to the onNexts:
dematerialize
-
-
-
as a line in a log:
log
-
-
I want to filter a sequence:
-
based on an arbitrary criteria:
filter
-
…that is asynchronously computed:
filterWhen
-
-
restricting on the type of the emitted objects:
ofType
-
by ignoring the values altogether:
ignoreElements
-
by ignoring duplicates:
-
in the whole sequence (logical set):
Flux#distinct
-
between subsequently emitted items (deduplication):
Flux#distinctUntilChanged
-
-
-
I want to keep only a subset of the sequence:
-
by taking N elements:
-
at the beginning of the sequence:
Flux#take(long)
-
…based on a duration:
Flux#take(Duration)
-
…only the first element, as a
Mono
:Flux#next()
-
…using
request(N)
rather than cancellation:Flux#limitRequest(long)
-
-
at the end of the sequence:
Flux#takeLast
-
until a criteria is met (inclusive):
Flux#takeUntil
(predicate-based),Flux#takeUntilOther
(companion publisher-based) -
while a criteria is met (exclusive):
Flux#takeWhile
-
-
by taking at most 1 element:
-
at a specific position:
Flux#elementAt
-
at the end:
.takeLast(1)
-
…and emit an error if empty:
Flux#last()
-
…and emit a default value if empty:
Flux#last(T)
-
-
-
by skipping elements:
-
at the beginning of the sequence:
Flux#skip(long)
-
…based on a duration:
Flux#skip(Duration)
-
-
at the end of the sequence:
Flux#skipLast
-
until a criteria is met (inclusive):
Flux#skipUntil
(predicate-based),Flux#skipUntilOther
(companion publisher-based) -
while a criteria is met (exclusive):
Flux#skipWhile
-
-
by sampling items:
-
by duration:
Flux#sample(Duration)
-
but keeping the first element in the sampling window instead of the last:
sampleFirst
-
-
by a publisher-based window:
Flux#sample(Publisher)
-
based on a publisher "timing out":
Flux#sampleTimeout
(each element triggers a publisher, and is emitted if that publisher does not overlap with the next)
-
-
-
I expect at most 1 element (error if more than one)…
-
and I want an error if the sequence is empty:
Flux#single()
-
and I want a default value if the sequence is empty:
Flux#single(T)
-
and I accept an empty sequence as well:
Flux#singleOrEmpty
-
-
I want to create an erroring sequence:
error
…-
…to replace the completion of a successful
Flux
:.concat(Flux.error(e))
-
…to replace the emission of a successful
Mono
:.then(Mono.error(e))
-
…if too much time elapses between onNexts:
timeout
-
…lazily:
error(Supplier<Throwable>)
-
-
I want the try/catch equivalent of:
-
throwing:
error
-
catching an exception:
-
and falling back to a default value:
onErrorReturn
-
and falling back to another
Flux
orMono
:onErrorResume
-
and wrapping and re-throwing:
.onErrorMap(t → new RuntimeException(t))
-
-
the finally block:
doFinally
-
the using pattern from Java 7:
using
factory method
-
-
I want to recover from errors…
-
by falling back:
-
to a value:
onErrorReturn
-
to a
Publisher
orMono
, possibly different ones depending on the error:Flux#onErrorResume
andMono#onErrorResume
-
-
by retrying…
-
…with a simple policy (max number of attempts):
retry()
,retry(long)
-
…triggered by a companion control Flux:
retryWhen
-
…using a standard backoff strategy (exponential backoff with jitter):
retryWhen(Retry.backoff(…))
(see also other factory methods inRetry
)
-
-
-
I want to deal with backpressure "errors" (request max from upstream and apply the strategy when downstream does not produce enough request)…
-
by throwing a special
IllegalStateException
:Flux#onBackpressureError
-
by dropping excess values:
Flux#onBackpressureDrop
-
…except the last one seen:
Flux#onBackpressureLatest
-
-
by buffering excess values (bounded or unbounded):
Flux#onBackpressureBuffer
-
…and applying a strategy when bounded buffer also overflows:
Flux#onBackpressureBuffer
with aBufferOverflowStrategy
-
-
-
I want to associate emissions with a timing measured…
-
…with best available precision and versatility of provided data:
timed
-
Timed<T>#elapsed()
forDuration
since lastonNext
-
Timed<T>#timestamp()
forInstant
representation of the epoch timestamp (milliseconds resolution) -
Timed<T>#elapsedSinceSubcription()
forDuration
since subscription (rather than last onNext) -
can have nanoseconds resolution for elapsed `Duration`s
-
-
…as a (legacy)
Tuple2<Long, T>
…-
since last onNext:
elapsed
-
since the dawn of time (well, computer time):
timestamp
-
-
-
I want my sequence to be interrupted if there is too much delay between emissions:
timeout
-
I want to get ticks from a clock, regular time intervals:
Flux#interval
-
I want to emit a single
0
after an initial delay: staticMono.delay
. -
I want to introduce a delay:
-
between each onNext signal:
Mono#delayElement
,Flux#delayElements
-
before the subscription happens:
delaySubscription
-
-
I want to split a
Flux<T>
into aFlux<Flux<T>>
, by a boundary criteria:-
of size:
window(int)
-
…with overlapping or dropping windows:
window(int, int)
-
-
of time
window(Duration)
-
…with overlapping or dropping windows:
window(Duration, Duration)
-
-
of size OR time (window closes when count is reached or timeout elapsed):
windowTimeout(int, Duration)
-
based on a predicate on elements:
windowUntil
-
……emitting the element that triggered the boundary in the next window (
cutBefore
variant):.windowUntil(predicate, true)
-
…keeping the window open while elements match a predicate:
windowWhile
(non-matching elements are not emitted)
-
-
driven by an arbitrary boundary represented by onNexts in a control Publisher:
window(Publisher)
,windowWhen
-
-
I want to split a
Flux<T>
and buffer elements within boundaries together…-
into
List
:-
by a size boundary:
buffer(int)
-
…with overlapping or dropping buffers:
buffer(int, int)
-
-
by a duration boundary:
buffer(Duration)
-
…with overlapping or dropping buffers:
buffer(Duration, Duration)
-
-
by a size OR duration boundary:
bufferTimeout(int, Duration)
-
by an arbitrary criteria boundary:
bufferUntil(Predicate)
-
…putting the element that triggered the boundary in the next buffer:
.bufferUntil(predicate, true)
-
…buffering while predicate matches and dropping the element that triggered the boundary:
bufferWhile(Predicate)
-
-
driven by an arbitrary boundary represented by onNexts in a control Publisher:
buffer(Publisher)
,bufferWhen
-
-
into an arbitrary "collection" type
C
: use variants likebuffer(int, Supplier<C>)
-
-
I want to split a
Flux<T>
so that element that share a characteristic end up in the same sub-flux:groupBy(Function<T,K>)
TIP: Note that this returns aFlux<GroupedFlux<K, T>>
, each innerGroupedFlux
shares the sameK
key accessible throughkey()
.
Note: all of these methods except Mono#toFuture
will throw an UnsupportedOperatorException
if called from
within a Scheduler
marked as "non-blocking only" (by default parallel()
and single()
).
-
I have a
Flux<T>
and I want to:-
block until I can get the first element:
Flux#blockFirst
-
…with a timeout:
Flux#blockFirst(Duration)
-
-
block until I can get the last element (or null if empty):
Flux#blockLast
-
…with a timeout:
Flux#blockLast(Duration)
-
-
synchronously switch to an
Iterable<T>
:Flux#toIterable
-
synchronously switch to a Java 8
Stream<T>
:Flux#toStream
-
-
I have a
Mono<T>
and I want:-
to block until I can get the value:
Mono#block
-
…with a timeout:
Mono#block(Duration)
-
-
a
CompletableFuture<T>
:Mono#toFuture
-
-
I want to connect multiple
Subscriber
to aFlux
:-
and decide when to trigger the source with
connect()
:publish()
(returns aConnectableFlux
) -
and trigger the source immediately (late subscribers see later data):
share()
-
and permanently connect the source when enough subscribers have registered:
.publish().autoConnect(n)
-
and automatically connect and cancel the source when subscribers go above/below the threshold:
.publish().refCount(n)
-
…but giving a chance for new subscribers to come in before cancelling:
.publish().refCountGrace(n, Duration)
-
-
-
I want to cache data from a
Publisher
and replay it to later subscribers:-
up to
n
elements:cache(int)
-
caching latest elements seen within a
Duration
(Time-To-Live):cache(Duration)
-
…but retain no more than
n
elements:cache(int, Duration)
-
-
but without immediately triggering the source:
Flux#replay
(returns aConnectableFlux
)
-