-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(combineLatest): add higher-order lettable version of combineLatest
- Loading branch information
Showing
6 changed files
with
175 additions
and
106 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,169 @@ | ||
import { Observable, ObservableInput } from '../Observable'; | ||
import { ArrayObservable } from '../observable/ArrayObservable'; | ||
import { isArray } from '../util/isArray'; | ||
import { Operator } from '../Operator'; | ||
import { Subscriber } from '../Subscriber'; | ||
import { OuterSubscriber } from '../OuterSubscriber'; | ||
import { InnerSubscriber } from '../InnerSubscriber'; | ||
import { subscribeToResult } from '../util/subscribeToResult'; | ||
import { OperatorFunction } from '../interfaces'; | ||
|
||
const none = {}; | ||
|
||
/* tslint:disable:max-line-length */ | ||
export function combineLatest<T, R>(project: (v1: T) => R): OperatorFunction<T, R>; | ||
export function combineLatest<T, T2, R>(v2: ObservableInput<T2>, project: (v1: T, v2: T2) => R): OperatorFunction<T, R>; | ||
export function combineLatest<T, T2, T3, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, project: (v1: T, v2: T2, v3: T3) => R): OperatorFunction<T, R>; | ||
export function combineLatest<T, T2, T3, T4, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, project: (v1: T, v2: T2, v3: T3, v4: T4) => R): OperatorFunction<T, R>; | ||
export function combineLatest<T, T2, T3, T4, T5, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, project: (v1: T, v2: T2, v3: T3, v4: T4, v5: T5) => R): OperatorFunction<T, R>; | ||
export function combineLatest<T, T2, T3, T4, T5, T6, R>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>, project: (v1: T, v2: T2, v3: T3, v4: T4, v5: T5, v6: T6) => R): OperatorFunction<T, R> ; | ||
export function combineLatest<T, T2>(v2: ObservableInput<T2>): OperatorFunction<T, [T, T2]>; | ||
export function combineLatest<T, T2, T3>(v2: ObservableInput<T2>, v3: ObservableInput<T3>): OperatorFunction<T, [T, T2, T3]>; | ||
export function combineLatest<T, T2, T3, T4>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>): OperatorFunction<T, [T, T2, T3, T4]>; | ||
export function combineLatest<T, T2, T3, T4, T5>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>): OperatorFunction<T, [T, T2, T3, T4, T5]>; | ||
export function combineLatest<T, T2, T3, T4, T5, T6>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>): OperatorFunction<T, [T, T2, T3, T4, T5, T6]> ; | ||
export function combineLatest<T, R>(...observables: Array<ObservableInput<T> | ((...values: Array<T>) => R)>): OperatorFunction<T, R>; | ||
export function combineLatest<T, R>(array: ObservableInput<T>[]): OperatorFunction<T, Array<T>>; | ||
export function combineLatest<T, TOther, R>(array: ObservableInput<TOther>[], project: (v1: T, ...values: Array<TOther>) => R): OperatorFunction<T, R>; | ||
/* tslint:enable:max-line-length */ | ||
|
||
/** | ||
* Combines multiple Observables to create an Observable whose values are | ||
* calculated from the latest values of each of its input Observables. | ||
* | ||
* <span class="informal">Whenever any input Observable emits a value, it | ||
* computes a formula using the latest values from all the inputs, then emits | ||
* the output of that formula.</span> | ||
* | ||
* <img src="./img/combineLatest.png" width="100%"> | ||
* | ||
* `combineLatest` combines the values from this Observable with values from | ||
* Observables passed as arguments. This is done by subscribing to each | ||
* Observable, in order, and collecting an array of each of the most recent | ||
* values any time any of the input Observables emits, then either taking that | ||
* array and passing it as arguments to an optional `project` function and | ||
* emitting the return value of that, or just emitting the array of recent | ||
* values directly if there is no `project` function. | ||
* | ||
* @example <caption>Dynamically calculate the Body-Mass Index from an Observable of weight and one for height</caption> | ||
* var weight = Rx.Observable.of(70, 72, 76, 79, 75); | ||
* var height = Rx.Observable.of(1.76, 1.77, 1.78); | ||
* var bmi = weight.combineLatest(height, (w, h) => w / (h * h)); | ||
* bmi.subscribe(x => console.log('BMI is ' + x)); | ||
* | ||
* // With output to console: | ||
* // BMI is 24.212293388429753 | ||
* // BMI is 23.93948099205209 | ||
* // BMI is 23.671253629592222 | ||
* | ||
* @see {@link combineAll} | ||
* @see {@link merge} | ||
* @see {@link withLatestFrom} | ||
* | ||
* @param {ObservableInput} other An input Observable to combine with the source | ||
* Observable. More than one input Observables may be given as argument. | ||
* @param {function} [project] An optional function to project the values from | ||
* the combined latest values into a new value on the output Observable. | ||
* @return {Observable} An Observable of projected values from the most recent | ||
* values from each input Observable, or an array of the most recent values from | ||
* each input Observable. | ||
* @method combineLatest | ||
* @owner Observable | ||
*/ | ||
export function combineLatest<T, R>(...observables: Array<ObservableInput<any> | | ||
Array<ObservableInput<any>> | | ||
((...values: Array<any>) => R)>): OperatorFunction<T, R> { | ||
let project: (...values: Array<any>) => R = null; | ||
if (typeof observables[observables.length - 1] === 'function') { | ||
project = <(...values: Array<any>) => R>observables.pop(); | ||
} | ||
|
||
// if the first and only other argument besides the resultSelector is an array | ||
// assume it's been called with `combineLatest([obs1, obs2, obs3], project)` | ||
if (observables.length === 1 && isArray(observables[0])) { | ||
observables = (<any>observables[0]).slice(); | ||
} | ||
|
||
return (source: Observable<T>) => source.lift.call(new ArrayObservable([source, ...observables]), new CombineLatestOperator(project)); | ||
} | ||
|
||
export class CombineLatestOperator<T, R> implements Operator<T, R> { | ||
constructor(private project?: (...values: Array<any>) => R) { | ||
} | ||
|
||
call(subscriber: Subscriber<R>, source: any): any { | ||
return source.subscribe(new CombineLatestSubscriber(subscriber, this.project)); | ||
} | ||
} | ||
|
||
/** | ||
* We need this JSDoc comment for affecting ESDoc. | ||
* @ignore | ||
* @extends {Ignored} | ||
*/ | ||
export class CombineLatestSubscriber<T, R> extends OuterSubscriber<T, R> { | ||
private active: number = 0; | ||
private values: any[] = []; | ||
private observables: any[] = []; | ||
private toRespond: number; | ||
|
||
constructor(destination: Subscriber<R>, private project?: (...values: Array<any>) => R) { | ||
super(destination); | ||
} | ||
|
||
protected _next(observable: any) { | ||
this.values.push(none); | ||
this.observables.push(observable); | ||
} | ||
|
||
protected _complete() { | ||
const observables = this.observables; | ||
const len = observables.length; | ||
if (len === 0) { | ||
this.destination.complete(); | ||
} else { | ||
this.active = len; | ||
this.toRespond = len; | ||
for (let i = 0; i < len; i++) { | ||
const observable = observables[i]; | ||
this.add(subscribeToResult(this, observable, observable, i)); | ||
} | ||
} | ||
} | ||
|
||
notifyComplete(unused: Subscriber<R>): void { | ||
if ((this.active -= 1) === 0) { | ||
this.destination.complete(); | ||
} | ||
} | ||
|
||
notifyNext(outerValue: T, innerValue: R, | ||
outerIndex: number, innerIndex: number, | ||
innerSub: InnerSubscriber<T, R>): void { | ||
const values = this.values; | ||
const oldVal = values[outerIndex]; | ||
const toRespond = !this.toRespond | ||
? 0 | ||
: oldVal === none ? --this.toRespond : this.toRespond; | ||
values[outerIndex] = innerValue; | ||
|
||
if (toRespond === 0) { | ||
if (this.project) { | ||
this._tryProject(values); | ||
} else { | ||
this.destination.next(values.slice()); | ||
} | ||
} | ||
} | ||
|
||
private _tryProject(values: any[]) { | ||
let result: any; | ||
try { | ||
result = this.project.apply(this, values); | ||
} catch (err) { | ||
this.destination.error(err); | ||
return; | ||
} | ||
this.destination.next(result); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters