-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(concatMapTo): add higher-order lettable version of concatMapTo
- Loading branch information
Showing
3 changed files
with
74 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
import { Observable, ObservableInput } from '../Observable'; | ||
import { concatMap } from './concatMap'; | ||
import { OperatorFunction } from '../interfaces'; | ||
|
||
/* tslint:disable:max-line-length */ | ||
export function concatMapTo<T, R>(observable: ObservableInput<R>): OperatorFunction<T, R>; | ||
export function concatMapTo<T, I, R>(observable: ObservableInput<I>, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): OperatorFunction<T, R>; | ||
/* tslint:enable:max-line-length */ | ||
|
||
/** | ||
* Projects each source value to the same Observable which is merged multiple | ||
* times in a serialized fashion on the output Observable. | ||
* | ||
* <span class="informal">It's like {@link concatMap}, but maps each value | ||
* always to the same inner Observable.</span> | ||
* | ||
* <img src="./img/concatMapTo.png" width="100%"> | ||
* | ||
* Maps each source value to the given Observable `innerObservable` regardless | ||
* of the source value, and then flattens those resulting Observables into one | ||
* single Observable, which is the output Observable. Each new `innerObservable` | ||
* instance emitted on the output Observable is concatenated with the previous | ||
* `innerObservable` instance. | ||
* | ||
* __Warning:__ if source values arrive endlessly and faster than their | ||
* corresponding inner Observables can complete, it will result in memory issues | ||
* as inner Observables amass in an unbounded buffer waiting for their turn to | ||
* be subscribed to. | ||
* | ||
* Note: `concatMapTo` is equivalent to `mergeMapTo` with concurrency parameter | ||
* set to `1`. | ||
* | ||
* @example <caption>For each click event, tick every second from 0 to 3, with no concurrency</caption> | ||
* var clicks = Rx.Observable.fromEvent(document, 'click'); | ||
* var result = clicks.concatMapTo(Rx.Observable.interval(1000).take(4)); | ||
* result.subscribe(x => console.log(x)); | ||
* | ||
* // Results in the following: | ||
* // (results are not concurrent) | ||
* // For every click on the "document" it will emit values 0 to 3 spaced | ||
* // on a 1000ms interval | ||
* // one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3 | ||
* | ||
* @see {@link concat} | ||
* @see {@link concatAll} | ||
* @see {@link concatMap} | ||
* @see {@link mergeMapTo} | ||
* @see {@link switchMapTo} | ||
* | ||
* @param {ObservableInput} innerObservable An Observable to replace each value from | ||
* the source Observable. | ||
* @param {function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any} [resultSelector] | ||
* A function to produce the value on the output Observable based on the values | ||
* and the indices of the source (outer) emission and the inner Observable | ||
* emission. The arguments passed to this function are: | ||
* - `outerValue`: the value that came from the source | ||
* - `innerValue`: the value that came from the projected Observable | ||
* - `outerIndex`: the "index" of the value that came from the source | ||
* - `innerIndex`: the "index" of the value from the projected Observable | ||
* @return {Observable} An observable of values merged together by joining the | ||
* passed observable with itself, one after the other, for each value emitted | ||
* from the source. | ||
* @method concatMapTo | ||
* @owner Observable | ||
*/ | ||
export function concatMapTo<T, I, R>( | ||
innerObservable: Observable<I>, | ||
resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R | ||
): OperatorFunction<T, R> { | ||
return concatMap(() => innerObservable, resultSelector); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters