diff --git a/src/observable/merge.ts b/src/observable/merge.ts index 25843ca227..599ee85684 100644 --- a/src/observable/merge.ts +++ b/src/observable/merge.ts @@ -1,3 +1,101 @@ -import { mergeStatic } from '../operator/merge'; +import { Observable, ObservableInput } from '../Observable'; +import { IScheduler } from '../Scheduler'; +import { ArrayObservable } from './ArrayObservable'; +import { isScheduler } from '../util/isScheduler'; +import { mergeAll } from '../operators/mergeAll'; -export const merge = mergeStatic; \ No newline at end of file +/* tslint:disable:max-line-length */ +export function merge(v1: ObservableInput, scheduler?: IScheduler): Observable; +export function merge(v1: ObservableInput, concurrent?: number, scheduler?: IScheduler): Observable; +export function merge(v1: ObservableInput, v2: ObservableInput, scheduler?: IScheduler): Observable; +export function merge(v1: ObservableInput, v2: ObservableInput, concurrent?: number, scheduler?: IScheduler): Observable; +export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, scheduler?: IScheduler): Observable; +export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, concurrent?: number, scheduler?: IScheduler): Observable; +export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, scheduler?: IScheduler): Observable; +export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, concurrent?: number, scheduler?: IScheduler): Observable; +export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, scheduler?: IScheduler): Observable; +export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, concurrent?: number, scheduler?: IScheduler): Observable; +export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, v6: ObservableInput, scheduler?: IScheduler): Observable; +export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, v6: ObservableInput, concurrent?: number, scheduler?: IScheduler): Observable; +export function merge(...observables: (ObservableInput | IScheduler | number)[]): Observable; +export function merge(...observables: (ObservableInput | IScheduler | number)[]): Observable; +/* tslint:enable:max-line-length */ +/** + * Creates an output Observable which concurrently emits all values from every + * given input Observable. + * + * Flattens multiple Observables together by blending + * their values into one Observable. + * + * + * + * `merge` subscribes to each given input Observable (as arguments), and simply + * forwards (without doing any transformation) all the values from all the input + * Observables to the output Observable. The output Observable only completes + * once all input Observables have completed. Any error delivered by an input + * Observable will be immediately emitted on the output Observable. + * + * @example Merge together two Observables: 1s interval and clicks + * var clicks = Rx.Observable.fromEvent(document, 'click'); + * var timer = Rx.Observable.interval(1000); + * var clicksOrTimer = Rx.Observable.merge(clicks, timer); + * clicksOrTimer.subscribe(x => console.log(x)); + * + * // Results in the following: + * // timer will emit ascending values, one every second(1000ms) to console + * // clicks logs MouseEvents to console everytime the "document" is clicked + * // Since the two streams are merged you see these happening + * // as they occur. + * + * @example Merge together 3 Observables, but only 2 run concurrently + * var timer1 = Rx.Observable.interval(1000).take(10); + * var timer2 = Rx.Observable.interval(2000).take(6); + * var timer3 = Rx.Observable.interval(500).take(10); + * var concurrent = 2; // the argument + * var merged = Rx.Observable.merge(timer1, timer2, timer3, concurrent); + * merged.subscribe(x => console.log(x)); + * + * // Results in the following: + * // - First timer1 and timer2 will run concurrently + * // - timer1 will emit a value every 1000ms for 10 iterations + * // - timer2 will emit a value every 2000ms for 6 iterations + * // - after timer1 hits it's max iteration, timer2 will + * // continue, and timer3 will start to run concurrently with timer2 + * // - when timer2 hits it's max iteration it terminates, and + * // timer3 will continue to emit a value every 500ms until it is complete + * + * @see {@link mergeAll} + * @see {@link mergeMap} + * @see {@link mergeMapTo} + * @see {@link mergeScan} + * + * @param {...ObservableInput} observables Input Observables to merge together. + * @param {number} [concurrent=Number.POSITIVE_INFINITY] Maximum number of input + * Observables being subscribed to concurrently. + * @param {Scheduler} [scheduler=null] The IScheduler to use for managing + * concurrency of input Observables. + * @return {Observable} an Observable that emits items that are the result of + * every input Observable. + * @static true + * @name merge + * @owner Observable + */ +export function merge(...observables: Array | IScheduler | number>): Observable { + let concurrent = Number.POSITIVE_INFINITY; + let scheduler: IScheduler = null; + let last: any = observables[observables.length - 1]; + if (isScheduler(last)) { + scheduler = observables.pop(); + if (observables.length > 1 && typeof observables[observables.length - 1] === 'number') { + concurrent = observables.pop(); + } + } else if (typeof last === 'number') { + concurrent = observables.pop(); + } + + if (scheduler === null && observables.length === 1 && observables[0] instanceof Observable) { + return >observables[0]; + } + + return mergeAll(concurrent)(new ArrayObservable(observables, scheduler)) as Observable; +} diff --git a/src/operator/concat.ts b/src/operator/concat.ts index d305552c74..2e020d99e2 100644 --- a/src/operator/concat.ts +++ b/src/operator/concat.ts @@ -2,6 +2,8 @@ import { Observable, ObservableInput } from '../Observable'; import { IScheduler } from '../Scheduler'; import { concat as higherOrder } from '../operators/concat'; +export { concat as concatStatic } from '../observable/concat'; + /* tslint:disable:max-line-length */ export function concat(this: Observable, scheduler?: IScheduler): Observable; export function concat(this: Observable, v2: ObservableInput, scheduler?: IScheduler): Observable; diff --git a/src/operator/merge.ts b/src/operator/merge.ts index b04f44e820..51733bf8e5 100644 --- a/src/operator/merge.ts +++ b/src/operator/merge.ts @@ -2,7 +2,7 @@ import { Observable, ObservableInput } from '../Observable'; import { IScheduler } from '../Scheduler'; import { merge as higherOrder } from '../operators/merge'; -export { mergeStatic } from '../operators/merge'; +export { merge as mergeStatic } from '../observable/merge'; /* tslint:disable:max-line-length */ export function merge(this: Observable, scheduler?: IScheduler): Observable; diff --git a/src/operators/concat.ts b/src/operators/concat.ts index 8f3f54f89c..593525972b 100644 --- a/src/operators/concat.ts +++ b/src/operators/concat.ts @@ -3,6 +3,8 @@ import { IScheduler } from '../Scheduler'; import { OperatorFunction, MonoTypeOperatorFunction } from '../interfaces'; import { concat as concatStatic } from '../observable/concat'; +export { concat as concatStatic } from '../observable/concat'; + /* tslint:disable:max-line-length */ export function concat(scheduler?: IScheduler): MonoTypeOperatorFunction; export function concat(v2: ObservableInput, scheduler?: IScheduler): OperatorFunction; diff --git a/src/operators/merge.ts b/src/operators/merge.ts index 21c58f3fd9..69524e387b 100644 --- a/src/operators/merge.ts +++ b/src/operators/merge.ts @@ -1,9 +1,9 @@ import { Observable, ObservableInput } from '../Observable'; import { IScheduler } from '../Scheduler'; -import { ArrayObservable } from '../observable/ArrayObservable'; -import { mergeAll } from './mergeAll'; -import { isScheduler } from '../util/isScheduler'; import { OperatorFunction, MonoTypeOperatorFunction } from '../interfaces'; +import { merge as mergeStatic } from '../observable/merge'; + +export { merge as mergeStatic } from '../observable/merge'; /* tslint:disable:max-line-length */ export function merge(scheduler?: IScheduler): MonoTypeOperatorFunction; @@ -21,27 +21,6 @@ export function merge(v2: ObservableInput, v3: Observ export function merge(...observables: Array | IScheduler | number>): MonoTypeOperatorFunction; export function merge(...observables: Array | IScheduler | number>): OperatorFunction; /* tslint:enable:max-line-length */ - -export function merge(...observables: Array | IScheduler | number>): OperatorFunction { - return (source: Observable) => source.lift.call(mergeStatic(source, ...observables)); -} - -/* tslint:disable:max-line-length */ -export function mergeStatic(v1: ObservableInput, scheduler?: IScheduler): Observable; -export function mergeStatic(v1: ObservableInput, concurrent?: number, scheduler?: IScheduler): Observable; -export function mergeStatic(v1: ObservableInput, v2: ObservableInput, scheduler?: IScheduler): Observable; -export function mergeStatic(v1: ObservableInput, v2: ObservableInput, concurrent?: number, scheduler?: IScheduler): Observable; -export function mergeStatic(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, scheduler?: IScheduler): Observable; -export function mergeStatic(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, concurrent?: number, scheduler?: IScheduler): Observable; -export function mergeStatic(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, scheduler?: IScheduler): Observable; -export function mergeStatic(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, concurrent?: number, scheduler?: IScheduler): Observable; -export function mergeStatic(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, scheduler?: IScheduler): Observable; -export function mergeStatic(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, concurrent?: number, scheduler?: IScheduler): Observable; -export function mergeStatic(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, v6: ObservableInput, scheduler?: IScheduler): Observable; -export function mergeStatic(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, v6: ObservableInput, concurrent?: number, scheduler?: IScheduler): Observable; -export function mergeStatic(...observables: (ObservableInput | IScheduler | number)[]): Observable; -export function mergeStatic(...observables: (ObservableInput | IScheduler | number)[]): Observable; -/* tslint:enable:max-line-length */ /** * Creates an output Observable which concurrently emits all values from every * given input Observable. @@ -51,73 +30,43 @@ export function mergeStatic(...observables: (ObservableInput | ISched * * * - * `merge` subscribes to each given input Observable (as arguments), and simply - * forwards (without doing any transformation) all the values from all the input - * Observables to the output Observable. The output Observable only completes - * once all input Observables have completed. Any error delivered by an input - * Observable will be immediately emitted on the output Observable. + * `merge` subscribes to each given input Observable (either the source or an + * Observable given as argument), and simply forwards (without doing any + * transformation) all the values from all the input Observables to the output + * Observable. The output Observable only completes once all input Observables + * have completed. Any error delivered by an input Observable will be immediately + * emitted on the output Observable. * * @example Merge together two Observables: 1s interval and clicks * var clicks = Rx.Observable.fromEvent(document, 'click'); * var timer = Rx.Observable.interval(1000); - * var clicksOrTimer = Rx.Observable.merge(clicks, timer); + * var clicksOrTimer = clicks.merge(timer); * clicksOrTimer.subscribe(x => console.log(x)); * - * // Results in the following: - * // timer will emit ascending values, one every second(1000ms) to console - * // clicks logs MouseEvents to console everytime the "document" is clicked - * // Since the two streams are merged you see these happening - * // as they occur. - * * @example Merge together 3 Observables, but only 2 run concurrently * var timer1 = Rx.Observable.interval(1000).take(10); * var timer2 = Rx.Observable.interval(2000).take(6); * var timer3 = Rx.Observable.interval(500).take(10); * var concurrent = 2; // the argument - * var merged = Rx.Observable.merge(timer1, timer2, timer3, concurrent); + * var merged = timer1.merge(timer2, timer3, concurrent); * merged.subscribe(x => console.log(x)); * - * // Results in the following: - * // - First timer1 and timer2 will run concurrently - * // - timer1 will emit a value every 1000ms for 10 iterations - * // - timer2 will emit a value every 2000ms for 6 iterations - * // - after timer1 hits it's max iteration, timer2 will - * // continue, and timer3 will start to run concurrently with timer2 - * // - when timer2 hits it's max iteration it terminates, and - * // timer3 will continue to emit a value every 500ms until it is complete - * * @see {@link mergeAll} * @see {@link mergeMap} * @see {@link mergeMapTo} * @see {@link mergeScan} * - * @param {...ObservableInput} observables Input Observables to merge together. + * @param {ObservableInput} other An input Observable to merge with the source + * Observable. More than one input Observables may be given as argument. * @param {number} [concurrent=Number.POSITIVE_INFINITY] Maximum number of input * Observables being subscribed to concurrently. * @param {Scheduler} [scheduler=null] The IScheduler to use for managing * concurrency of input Observables. - * @return {Observable} an Observable that emits items that are the result of + * @return {Observable} An Observable that emits items that are the result of * every input Observable. - * @static true - * @name merge + * @method merge * @owner Observable */ -export function mergeStatic(...observables: Array | IScheduler | number>): Observable { - let concurrent = Number.POSITIVE_INFINITY; - let scheduler: IScheduler = null; - let last: any = observables[observables.length - 1]; - if (isScheduler(last)) { - scheduler = observables.pop(); - if (observables.length > 1 && typeof observables[observables.length - 1] === 'number') { - concurrent = observables.pop(); - } - } else if (typeof last === 'number') { - concurrent = observables.pop(); - } - - if (scheduler === null && observables.length === 1 && observables[0] instanceof Observable) { - return >observables[0]; - } - - return mergeAll(concurrent)(new ArrayObservable(observables, scheduler)) as Observable; +export function merge(...observables: Array | IScheduler | number>): OperatorFunction { + return (source: Observable) => source.lift.call(mergeStatic(source, ...observables)); }