Skip to content

Latest commit

 

History

History
330 lines (273 loc) · 15.9 KB

apdx-operatorChoice.adoc

File metadata and controls

330 lines (273 loc) · 15.9 KB

Which operator do I need?

Tip
In this section, if an operator is specific to Flux or Mono, it is prefixed accordingly. Common operators have no prefix. When a specific use case is covered by a combination of operators, it is presented as a method call, with leading dot and parameters in parentheses, as follows: .methodCall(parameter).

I want to deal with:

Creating a New Sequence…​

  • that emits a T, and I already have: just

    • …​from an Optional<T>: Mono#justOrEmpty(Optional<T>)

    • …​from a potentially null T: Mono#justOrEmpty(T)

  • that emits a T returned by a method: just as well

    • …​but lazily captured: use Mono#fromSupplier or wrap just inside defer

  • that emits several T I can explicitly enumerate: Flux#just(T…​)

  • that iterates over:

    • an array: Flux#fromArray

    • a collection or iterable: Flux#fromIterable

    • a range of integers: Flux#range

    • a Stream supplied for each Subscription: Flux#fromStream(Supplier<Stream>)

  • that emits from various single-valued sources such as:

    • a Supplier<T>: Mono#fromSupplier

    • a task: Mono#fromCallable, Mono#fromRunnable

    • a CompletableFuture<T>: Mono#fromFuture

  • that completes: empty

  • that errors immediately: error

    • …​but lazily build the Throwable: error(Supplier<Throwable>)

  • that never does anything: never

  • that is decided at subscription: defer

  • that depends on a disposable resource: using

  • that generates events programmatically (can use state):

    • synchronously and one-by-one: Flux#generate

    • asynchronously (can also be sync), multiple emissions possible in one pass: Flux#create (Mono#create as well, without the multiple emission aspect)

Transforming an Existing Sequence

  • I want to transform existing data:

    • on a 1-to-1 basis (eg. strings to their length): map

      • …​by just casting it: cast

      • …​in order to materialize each source value’s index: Flux#index

    • on a 1-to-n basis (eg. strings to their characters): flatMap + use a factory method

    • on a 1-to-n basis with programmatic behavior for each source element and/or state: handle

    • running an asynchronous task for each source item (eg. urls to http request): flatMap + an async Publisher-returning method

      • …​ignoring some data: conditionally return a Mono.empty() in the flatMap lambda

      • …​retaining the original sequence order: Flux#flatMapSequential (this triggers the async processes immediately but reorders the results)

      • …​where the async task can return multiple values, from a Mono source: Mono#flatMapMany

  • I want to add pre-set elements to an existing sequence:

    • at the start: Flux#startWith(T…​)

    • at the end: Flux#concatWith(T…​)

  • I want to aggregate a Flux: (the Flux# prefix is assumed below)

    • into a List: collectList, collectSortedList

    • into a Map: collectMap, collectMultiMap

    • into an arbitrary container: collect

    • into the size of the sequence: count

    • by applying a function between each element (eg. running sum): reduce

      • …​but emitting each intermediary value: scan

    • into a boolean value from a predicate:

      • applied to all values (AND): all

      • applied to at least one value (OR): any

      • testing the presence of any value: hasElements

      • testing the presence of a specific value: hasElement

  • I want to combine publishers…​

    • in sequential order: Flux#concat or .concatWith(other)

      • …​but delaying any error until remaining publishers have been emitted: Flux#concatDelayError

      • …​but eagerly subscribing to subsequent publishers: Flux#mergeSequential

    • in emission order (combined items emitted as they come): Flux#merge / .mergeWith(other)

      • …​with different types (transforming merge): Flux#zip / Flux#zipWith

    • by pairing values:

      • from 2 Monos into a Tuple2: Mono#zipWith

      • from n Monos when they all completed: Mono#zip

    • by coordinating their termination:

      • from 1 Mono and any source into a Mono<Void>: Mono#and

      • from n sources when they all completed: Mono#when

      • into an arbitrary container type:

        • each time all sides have emitted: Flux#zip (up to the smallest cardinality)

        • each time a new value arrives at either side: Flux#combineLatest

    • selecting the first publisher which…​

      • produces a value (onNext): firstWithValue

      • produces any signal: firstWithSignal

    • triggered by the elements in a source sequence: switchMap (each source element is mapped to a Publisher)

    • triggered by the start of the next publisher in a sequence of publishers: switchOnNext

  • I want to repeat an existing sequence: repeat

    • …​but at time intervals: Flux.interval(duration).flatMap(tick → myExistingPublisher)

  • I have an empty sequence but…​

    • I want a value instead: defaultIfEmpty

    • I want another sequence instead: switchIfEmpty

  • I have a sequence but I am not interested in values: ignoreElements

    • …​and I want the completion represented as a Mono: then

    • …​and I want to wait for another task to complete at the end: thenEmpty

    • …​and I want to switch to another Mono at the end: Mono#then(mono)

    • …​and I want to emit a single value at the end: Mono#thenReturn(T)

    • …​and I want to switch to a Flux at the end: thenMany

  • I have a Mono for which I want to defer completion…​

    • …​until another publisher, which is derived from this value, has completed: Mono#delayUntil(Function)

  • I want to expand elements recursively into a graph of sequences and emit the combination…​

    • …​expanding the graph breadth first: expand(Function)

    • …​expanding the graph depth first: expandDeep(Function)

Peeking into a Sequence

  • Without modifying the final sequence, I want to:

    • get notified of / execute additional behavior (sometimes referred to as "side-effects") on:

      • emissions: doOnNext

      • completion: Flux#doOnComplete, Mono#doOnSuccess (includes the result, if any)

      • error termination: doOnError

      • cancellation: doOnCancel

      • "start" of the sequence: doFirst

        • this is tied to Publisher#subscribe(Subscriber)

      • post-subscription : doOnSubscribe

        • as in Subscription acknowledgment after subscribe

        • this is tied to Subscriber#onSubscribe(Subscription)

      • request: doOnRequest

      • completion or error: doOnTerminate (Mono version includes the result, if any)

        • but after it has been propagated downstream: doAfterTerminate

      • any type of signal, represented as a Signal: Flux#doOnEach

      • any terminating condition (complete, error, cancel): doFinally

    • log what happens internally: log

  • I want to know of all events:

    • each represented as Signal object:

      • in a callback outside the sequence: doOnEach

      • instead of the original onNext emissions: materialize

        • …​and get back to the onNexts: dematerialize

    • as a line in a log: log

Filtering a Sequence

  • I want to filter a sequence:

    • based on an arbitrary criteria: filter

      • …​that is asynchronously computed: filterWhen

    • restricting on the type of the emitted objects: ofType

    • by ignoring the values altogether: ignoreElements

    • by ignoring duplicates:

      • in the whole sequence (logical set): Flux#distinct

      • between subsequently emitted items (deduplication): Flux#distinctUntilChanged

  • I want to keep only a subset of the sequence:

    • by taking N elements:

      • at the beginning of the sequence: Flux#take(long)

        • …​based on a duration: Flux#take(Duration)

        • …​only the first element, as a Mono: Flux#next()

        • …​using request(N) rather than cancellation: Flux#limitRequest(long)

      • at the end of the sequence: Flux#takeLast

      • until a criteria is met (inclusive): Flux#takeUntil (predicate-based), Flux#takeUntilOther (companion publisher-based)

      • while a criteria is met (exclusive): Flux#takeWhile

    • by taking at most 1 element:

      • at a specific position: Flux#elementAt

      • at the end: .takeLast(1)

        • …​and emit an error if empty: Flux#last()

        • …​and emit a default value if empty: Flux#last(T)

    • by skipping elements:

      • at the beginning of the sequence: Flux#skip(long)

        • …​based on a duration: Flux#skip(Duration)

      • at the end of the sequence: Flux#skipLast

      • until a criteria is met (inclusive): Flux#skipUntil (predicate-based), Flux#skipUntilOther (companion publisher-based)

      • while a criteria is met (exclusive): Flux#skipWhile

    • by sampling items:

      • by duration: Flux#sample(Duration)

        • but keeping the first element in the sampling window instead of the last: sampleFirst

      • by a publisher-based window: Flux#sample(Publisher)

      • based on a publisher "timing out": Flux#sampleTimeout (each element triggers a publisher, and is emitted if that publisher does not overlap with the next)

  • I expect at most 1 element (error if more than one)…​

    • and I want an error if the sequence is empty: Flux#single()

    • and I want a default value if the sequence is empty: Flux#single(T)

    • and I accept an empty sequence as well: Flux#singleOrEmpty

Handling Errors

  • I want to create an erroring sequence: error…​

    • …​to replace the completion of a successful Flux: .concat(Flux.error(e))

    • …​to replace the emission of a successful Mono: .then(Mono.error(e))

    • …​if too much time elapses between onNexts: timeout

    • …​lazily: error(Supplier<Throwable>)

  • I want the try/catch equivalent of:

    • throwing: error

    • catching an exception:

      • and falling back to a default value: onErrorReturn

      • and falling back to another Flux or Mono: onErrorResume

      • and wrapping and re-throwing: .onErrorMap(t → new RuntimeException(t))

    • the finally block: doFinally

    • the using pattern from Java 7: using factory method

  • I want to recover from errors…​

    • by falling back:

      • to a value: onErrorReturn

      • to a Publisher or Mono, possibly different ones depending on the error: Flux#onErrorResume and Mono#onErrorResume

    • by retrying…​

      • …​with a simple policy (max number of attempts): retry(), retry(long)

      • …​triggered by a companion control Flux: retryWhen

      • …​using a standard backoff strategy (exponential backoff with jitter): retryWhen(Retry.backoff(…​)) (see also other factory methods in Retry)

  • I want to deal with backpressure "errors" (request max from upstream and apply the strategy when downstream does not produce enough request)…​

    • by throwing a special IllegalStateException: Flux#onBackpressureError

    • by dropping excess values: Flux#onBackpressureDrop

      • …​except the last one seen: Flux#onBackpressureLatest

    • by buffering excess values (bounded or unbounded): Flux#onBackpressureBuffer

      • …​and applying a strategy when bounded buffer also overflows: Flux#onBackpressureBuffer with a BufferOverflowStrategy

Working with Time

  • I want to associate emissions with a timing measured…​

    • …​with best available precision and versatility of provided data: timed

      • Timed<T>#elapsed() for Duration since last onNext

      • Timed<T>#timestamp() for Instant representation of the epoch timestamp (milliseconds resolution)

      • Timed<T>#elapsedSinceSubcription() for Duration since subscription (rather than last onNext)

      • can have nanoseconds resolution for elapsed `Duration`s

    • …​as a (legacy) Tuple2<Long, T>…​

      • since last onNext: elapsed

      • since the dawn of time (well, computer time): timestamp

  • I want my sequence to be interrupted if there is too much delay between emissions: timeout

  • I want to get ticks from a clock, regular time intervals: Flux#interval

  • I want to emit a single 0 after an initial delay: static Mono.delay.

  • I want to introduce a delay:

    • between each onNext signal: Mono#delayElement, Flux#delayElements

    • before the subscription happens: delaySubscription

Splitting a Flux

  • I want to split a Flux<T> into a Flux<Flux<T>>, by a boundary criteria:

    • of size: window(int)

      • …​with overlapping or dropping windows: window(int, int)

    • of time window(Duration)

      • …​with overlapping or dropping windows: window(Duration, Duration)

    • of size OR time (window closes when count is reached or timeout elapsed): windowTimeout(int, Duration)

    • based on a predicate on elements: windowUntil

      • …​…emitting the element that triggered the boundary in the next window (cutBefore variant): .windowUntil(predicate, true)

      • …​keeping the window open while elements match a predicate: windowWhile (non-matching elements are not emitted)

    • driven by an arbitrary boundary represented by onNexts in a control Publisher: window(Publisher), windowWhen

  • I want to split a Flux<T> and buffer elements within boundaries together…​

    • into List:

      • by a size boundary: buffer(int)

        • …​with overlapping or dropping buffers: buffer(int, int)

      • by a duration boundary: buffer(Duration)

        • …​with overlapping or dropping buffers: buffer(Duration, Duration)

      • by a size OR duration boundary: bufferTimeout(int, Duration)

      • by an arbitrary criteria boundary: bufferUntil(Predicate)

        • …​putting the element that triggered the boundary in the next buffer: .bufferUntil(predicate, true)

        • …​buffering while predicate matches and dropping the element that triggered the boundary: bufferWhile(Predicate)

      • driven by an arbitrary boundary represented by onNexts in a control Publisher: buffer(Publisher), bufferWhen

    • into an arbitrary "collection" type C: use variants like buffer(int, Supplier<C>)

  • I want to split a Flux<T> so that element that share a characteristic end up in the same sub-flux: groupBy(Function<T,K>) TIP: Note that this returns a Flux<GroupedFlux<K, T>>, each inner GroupedFlux shares the same K key accessible through key().

Going Back to the Synchronous World

Note: all of these methods except Mono#toFuture will throw an UnsupportedOperatorException if called from within a Scheduler marked as "non-blocking only" (by default parallel() and single()).

  • I have a Flux<T> and I want to:

    • block until I can get the first element: Flux#blockFirst

      • …​with a timeout: Flux#blockFirst(Duration)

    • block until I can get the last element (or null if empty): Flux#blockLast

      • …​with a timeout: Flux#blockLast(Duration)

    • synchronously switch to an Iterable<T>: Flux#toIterable

    • synchronously switch to a Java 8 Stream<T>: Flux#toStream

  • I have a Mono<T> and I want:

    • to block until I can get the value: Mono#block

      • …​with a timeout: Mono#block(Duration)

    • a CompletableFuture<T>: Mono#toFuture

Multicasting a Flux to several Subscribers

  • I want to connect multiple Subscriber to a Flux:

    • and decide when to trigger the source with connect(): publish() (returns a ConnectableFlux)

    • and trigger the source immediately (late subscribers see later data): share()

    • and permanently connect the source when enough subscribers have registered: .publish().autoConnect(n)

    • and automatically connect and cancel the source when subscribers go above/below the threshold: .publish().refCount(n)

      • …​but giving a chance for new subscribers to come in before cancelling: .publish().refCountGrace(n, Duration)

  • I want to cache data from a Publisher and replay it to later subscribers:

    • up to n elements: cache(int)

    • caching latest elements seen within a Duration (Time-To-Live): cache(Duration)

      • …​but retain no more than n elements: cache(int, Duration)

    • but without immediately triggering the source: Flux#replay (returns a ConnectableFlux)