Skip to content

Commit

Permalink
feat(lettables): add higher-order lettable versions of concat, concat…
Browse files Browse the repository at this point in the history
…All, mergeAll

- moves static impl of concat to `observable\/concat`
  • Loading branch information
benlesh committed Jun 16, 2017
1 parent e646851 commit d7e8be7
Show file tree
Hide file tree
Showing 7 changed files with 298 additions and 124 deletions.
117 changes: 115 additions & 2 deletions src/observable/concat.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,116 @@
import { concatStatic } from '../operator/concat';
import { Observable, ObservableInput } from '../Observable';
import { IScheduler } from '../Scheduler';
import { isScheduler } from '../util/isScheduler';
import { of } from './of';
import { from } from './from';
import { concatAll } from '../operators';

export const concat = concatStatic;
/* tslint:disable:max-line-length */
export function concat<T>(v1: ObservableInput<T>, scheduler?: IScheduler): Observable<T>;
export function concat<T, T2>(v1: ObservableInput<T>, v2: ObservableInput<T2>, scheduler?: IScheduler): Observable<T | T2>;
export function concat<T, T2, T3>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, scheduler?: IScheduler): Observable<T | T2 | T3>;
export function concat<T, T2, T3, T4>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, scheduler?: IScheduler): Observable<T | T2 | T3 | T4>;
export function concat<T, T2, T3, T4, T5>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, scheduler?: IScheduler): Observable<T | T2 | T3 | T4 | T5>;
export function concat<T, T2, T3, T4, T5, T6>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>, scheduler?: IScheduler): Observable<T | T2 | T3 | T4 | T5 | T6>;
export function concat<T>(...observables: (ObservableInput<T> | IScheduler)[]): Observable<T>;
export function concat<T, R>(...observables: (ObservableInput<any> | IScheduler)[]): Observable<R>;
/* tslint:enable:max-line-length */
/**
* Creates an output Observable which sequentially emits all values from given
* Observable and then moves on to the next.
*
* <span class="informal">Concatenates multiple Observables together by
* sequentially emitting their values, one Observable after the other.</span>
*
* <img src="./img/concat.png" width="100%">
*
* `concat` joins multiple Observables together, by subscribing to them one at a time and
* merging their results into the output Observable. You can pass either an array of
* Observables, or put them directly as arguments. Passing an empty array will result
* in Observable that completes immediately.
*
* `concat` will subscribe to first input Observable and emit all its values, without
* changing or affecting them in any way. When that Observable completes, it will
* subscribe to then next Observable passed and, again, emit its values. This will be
* repeated, until the operator runs out of Observables. When last input Observable completes,
* `concat` will complete as well. At any given moment only one Observable passed to operator
* emits values. If you would like to emit values from passed Observables concurrently, check out
* {@link merge} instead, especially with optional `concurrent` parameter. As a matter of fact,
* `concat` is an equivalent of `merge` operator with `concurrent` parameter set to `1`.
*
* Note that if some input Observable never completes, `concat` will also never complete
* and Observables following the one that did not complete will never be subscribed. On the other
* hand, if some Observable simply completes immediately after it is subscribed, it will be
* invisible for `concat`, which will just move on to the next Observable.
*
* If any Observable in chain errors, instead of passing control to the next Observable,
* `concat` will error immediately as well. Observables that would be subscribed after
* the one that emitted error, never will.
*
* If you pass to `concat` the same Observable many times, its stream of values
* will be "replayed" on every subscription, which means you can repeat given Observable
* as many times as you like. If passing the same Observable to `concat` 1000 times becomes tedious,
* you can always use {@link repeat}.
*
* @example <caption>Concatenate a timer counting from 0 to 3 with a synchronous sequence from 1 to 10</caption>
* var timer = Rx.Observable.interval(1000).take(4);
* var sequence = Rx.Observable.range(1, 10);
* var result = Rx.Observable.concat(timer, sequence);
* result.subscribe(x => console.log(x));
*
* // results in:
* // 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3 -immediate-> 1 ... 10
*
*
* @example <caption>Concatenate an array of 3 Observables</caption>
* var timer1 = Rx.Observable.interval(1000).take(10);
* var timer2 = Rx.Observable.interval(2000).take(6);
* var timer3 = Rx.Observable.interval(500).take(10);
* var result = Rx.Observable.concat([timer1, timer2, timer3]); // note that array is passed
* result.subscribe(x => console.log(x));
*
* // results in the following:
* // (Prints to console sequentially)
* // -1000ms-> 0 -1000ms-> 1 -1000ms-> ... 9
* // -2000ms-> 0 -2000ms-> 1 -2000ms-> ... 5
* // -500ms-> 0 -500ms-> 1 -500ms-> ... 9
*
*
* @example <caption>Concatenate the same Observable to repeat it</caption>
* const timer = Rx.Observable.interval(1000).take(2);
*
* Rx.Observable.concat(timer, timer) // concating the same Observable!
* .subscribe(
* value => console.log(value),
* err => {},
* () => console.log('...and it is done!')
* );
*
* // Logs:
* // 0 after 1s
* // 1 after 2s
* // 0 after 3s
* // 1 after 4s
* // "...and it is done!" also after 4s
*
* @see {@link concatAll}
* @see {@link concatMap}
* @see {@link concatMapTo}
*
* @param {ObservableInput} input1 An input Observable to concatenate with others.
* @param {ObservableInput} input2 An input Observable to concatenate with others.
* More than one input Observables may be given as argument.
* @param {Scheduler} [scheduler=null] An optional IScheduler to schedule each
* Observable subscription on.
* @return {Observable} All values of each passed Observable merged into a
* single Observable, in order, in serial fashion.
* @static true
* @name concat
* @owner Observable
*/
export function concat<T, R>(...observables: Array<ObservableInput<any> | IScheduler>): Observable<R> {
if (observables.length === 1 || (observables.length === 2 && isScheduler(observables[1]))) {
return from(<any>observables[0]);
}
return concatAll()(of(...observables));
}
123 changes: 2 additions & 121 deletions src/operator/concat.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import { Observable, ObservableInput } from '../Observable';
import { IScheduler } from '../Scheduler';
import { isScheduler } from '../util/isScheduler';
import { ArrayObservable } from '../observable/ArrayObservable';
import { MergeAllOperator } from './mergeAll';
import { concat as higherOrder } from '../operators';

/* tslint:disable:max-line-length */
export function concat<T>(this: Observable<T>, scheduler?: IScheduler): Observable<T>;
Expand Down Expand Up @@ -65,122 +63,5 @@ export function concat<T, R>(this: Observable<T>, ...observables: Array<Observab
* @owner Observable
*/
export function concat<T, R>(this: Observable<T>, ...observables: Array<ObservableInput<any> | IScheduler>): Observable<R> {
return this.lift.call(concatStatic<T, R>(this, ...observables));
}

/* tslint:disable:max-line-length */
export function concatStatic<T>(v1: ObservableInput<T>, scheduler?: IScheduler): Observable<T>;
export function concatStatic<T, T2>(v1: ObservableInput<T>, v2: ObservableInput<T2>, scheduler?: IScheduler): Observable<T | T2>;
export function concatStatic<T, T2, T3>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, scheduler?: IScheduler): Observable<T | T2 | T3>;
export function concatStatic<T, T2, T3, T4>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, scheduler?: IScheduler): Observable<T | T2 | T3 | T4>;
export function concatStatic<T, T2, T3, T4, T5>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, scheduler?: IScheduler): Observable<T | T2 | T3 | T4 | T5>;
export function concatStatic<T, T2, T3, T4, T5, T6>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>, scheduler?: IScheduler): Observable<T | T2 | T3 | T4 | T5 | T6>;
export function concatStatic<T>(...observables: (ObservableInput<T> | IScheduler)[]): Observable<T>;
export function concatStatic<T, R>(...observables: (ObservableInput<any> | IScheduler)[]): Observable<R>;
/* tslint:enable:max-line-length */
/**
* Creates an output Observable which sequentially emits all values from given
* Observable and then moves on to the next.
*
* <span class="informal">Concatenates multiple Observables together by
* sequentially emitting their values, one Observable after the other.</span>
*
* <img src="./img/concat.png" width="100%">
*
* `concat` joins multiple Observables together, by subscribing to them one at a time and
* merging their results into the output Observable. You can pass either an array of
* Observables, or put them directly as arguments. Passing an empty array will result
* in Observable that completes immediately.
*
* `concat` will subscribe to first input Observable and emit all its values, without
* changing or affecting them in any way. When that Observable completes, it will
* subscribe to then next Observable passed and, again, emit its values. This will be
* repeated, until the operator runs out of Observables. When last input Observable completes,
* `concat` will complete as well. At any given moment only one Observable passed to operator
* emits values. If you would like to emit values from passed Observables concurrently, check out
* {@link merge} instead, especially with optional `concurrent` parameter. As a matter of fact,
* `concat` is an equivalent of `merge` operator with `concurrent` parameter set to `1`.
*
* Note that if some input Observable never completes, `concat` will also never complete
* and Observables following the one that did not complete will never be subscribed. On the other
* hand, if some Observable simply completes immediately after it is subscribed, it will be
* invisible for `concat`, which will just move on to the next Observable.
*
* If any Observable in chain errors, instead of passing control to the next Observable,
* `concat` will error immediately as well. Observables that would be subscribed after
* the one that emitted error, never will.
*
* If you pass to `concat` the same Observable many times, its stream of values
* will be "replayed" on every subscription, which means you can repeat given Observable
* as many times as you like. If passing the same Observable to `concat` 1000 times becomes tedious,
* you can always use {@link repeat}.
*
* @example <caption>Concatenate a timer counting from 0 to 3 with a synchronous sequence from 1 to 10</caption>
* var timer = Rx.Observable.interval(1000).take(4);
* var sequence = Rx.Observable.range(1, 10);
* var result = Rx.Observable.concat(timer, sequence);
* result.subscribe(x => console.log(x));
*
* // results in:
* // 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3 -immediate-> 1 ... 10
*
*
* @example <caption>Concatenate an array of 3 Observables</caption>
* var timer1 = Rx.Observable.interval(1000).take(10);
* var timer2 = Rx.Observable.interval(2000).take(6);
* var timer3 = Rx.Observable.interval(500).take(10);
* var result = Rx.Observable.concat([timer1, timer2, timer3]); // note that array is passed
* result.subscribe(x => console.log(x));
*
* // results in the following:
* // (Prints to console sequentially)
* // -1000ms-> 0 -1000ms-> 1 -1000ms-> ... 9
* // -2000ms-> 0 -2000ms-> 1 -2000ms-> ... 5
* // -500ms-> 0 -500ms-> 1 -500ms-> ... 9
*
*
* @example <caption>Concatenate the same Observable to repeat it</caption>
* const timer = Rx.Observable.interval(1000).take(2);
*
* Rx.Observable.concat(timer, timer) // concating the same Observable!
* .subscribe(
* value => console.log(value),
* err => {},
* () => console.log('...and it is done!')
* );
*
* // Logs:
* // 0 after 1s
* // 1 after 2s
* // 0 after 3s
* // 1 after 4s
* // "...and it is done!" also after 4s
*
* @see {@link concatAll}
* @see {@link concatMap}
* @see {@link concatMapTo}
*
* @param {ObservableInput} input1 An input Observable to concatenate with others.
* @param {ObservableInput} input2 An input Observable to concatenate with others.
* More than one input Observables may be given as argument.
* @param {Scheduler} [scheduler=null] An optional IScheduler to schedule each
* Observable subscription on.
* @return {Observable} All values of each passed Observable merged into a
* single Observable, in order, in serial fashion.
* @static true
* @name concat
* @owner Observable
*/
export function concatStatic<T, R>(...observables: Array<ObservableInput<any> | IScheduler>): Observable<R> {
let scheduler: IScheduler = null;
let args = <any[]>observables;
if (isScheduler(args[observables.length - 1])) {
scheduler = args.pop();
}

if (scheduler === null && observables.length === 1 && observables[0] instanceof Observable) {
return <Observable<R>>observables[0];
}

return new ArrayObservable(observables, scheduler).lift(new MergeAllOperator<R>(1));
return higherOrder(...observables)(this);
}
2 changes: 1 addition & 1 deletion src/operator/startWith.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Observable } from '../Observable';
import { ArrayObservable } from '../observable/ArrayObservable';
import { ScalarObservable } from '../observable/ScalarObservable';
import { EmptyObservable } from '../observable/EmptyObservable';
import { concatStatic } from './concat';
import { concat as concatStatic } from '../observable/concat';
import { isScheduler } from '../util/isScheduler';

/* tslint:disable:max-line-length */
Expand Down
70 changes: 70 additions & 0 deletions src/operators/concat.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { Observable, ObservableInput } from '../Observable';
import { IScheduler } from '../Scheduler';
import { OperatorFunction, MonoTypeOperatorFunction } from '../interfaces';
import { concat as concatStatic } from '../observable/concat';

/* tslint:disable:max-line-length */
export function concat<T>(scheduler?: IScheduler): MonoTypeOperatorFunction<T>;
export function concat<T, T2>(v2: ObservableInput<T2>, scheduler?: IScheduler): OperatorFunction<T, T | T2>;
export function concat<T, T2, T3>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, scheduler?: IScheduler): OperatorFunction<T, T | T2 | T3>;
export function concat<T, T2, T3, T4>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, scheduler?: IScheduler): OperatorFunction<T, T | T2 | T3 | T4>;
export function concat<T, T2, T3, T4, T5>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, scheduler?: IScheduler): OperatorFunction<T, T | T2 | T3 | T4 | T5>;
export function concat<T, T2, T3, T4, T5, T6>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>, scheduler?: IScheduler): OperatorFunction<T, T | T2 | T3 | T4 | T5 | T6>;
export function concat<T>(...observables: Array<ObservableInput<T> | IScheduler>): MonoTypeOperatorFunction<T>;
export function concat<T, R>(...observables: Array<ObservableInput<any> | IScheduler>): OperatorFunction<T, R>;
/* tslint:enable:max-line-length */

/**
* Creates an output Observable which sequentially emits all values from every
* given input Observable after the current Observable.
*
* <span class="informal">Concatenates multiple Observables together by
* sequentially emitting their values, one Observable after the other.</span>
*
* <img src="./img/concat.png" width="100%">
*
* Joins this Observable with multiple other Observables by subscribing to them
* one at a time, starting with the source, and merging their results into the
* output Observable. Will wait for each Observable to complete before moving
* on to the next.
*
* @example <caption>Concatenate a timer counting from 0 to 3 with a synchronous sequence from 1 to 10</caption>
* var timer = Rx.Observable.interval(1000).take(4);
* var sequence = Rx.Observable.range(1, 10);
* var result = timer.concat(sequence);
* result.subscribe(x => console.log(x));
*
* // results in:
* // 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3 -immediate-> 1 ... 10
*
* @example <caption>Concatenate 3 Observables</caption>
* var timer1 = Rx.Observable.interval(1000).take(10);
* var timer2 = Rx.Observable.interval(2000).take(6);
* var timer3 = Rx.Observable.interval(500).take(10);
* var result = timer1.concat(timer2, timer3);
* result.subscribe(x => console.log(x));
*
* // results in the following:
* // (Prints to console sequentially)
* // -1000ms-> 0 -1000ms-> 1 -1000ms-> ... 9
* // -2000ms-> 0 -2000ms-> 1 -2000ms-> ... 5
* // -500ms-> 0 -500ms-> 1 -500ms-> ... 9
*
* @see {@link concatAll}
* @see {@link concatMap}
* @see {@link concatMapTo}
*
* @param {ObservableInput} other An input Observable to concatenate after the source
* Observable. More than one input Observables may be given as argument.
* @param {Scheduler} [scheduler=null] An optional IScheduler to schedule each
* Observable subscription on.
* @return {Observable} All values of each passed Observable merged into a
* single Observable, in order, in serial fashion.
* @method concat
* @owner Observable
*/
export function concat<T, R>(...observables: Array<ObservableInput<any> | IScheduler>): OperatorFunction<T, R> {
return function concatOperatorFunction(source: Observable<T>): Observable<T | R> {
return source.lift.call(concatStatic<T, R>(source, ...observables));
};
}
55 changes: 55 additions & 0 deletions src/operators/concatAll.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@

import { mergeAll } from './mergeAll';
import { MonoTypeOperatorFunction } from '../interfaces';

/**
* Converts a higher-order Observable into a first-order Observable by
* concatenating the inner Observables in order.
*
* <span class="informal">Flattens an Observable-of-Observables by putting one
* inner Observable after the other.</span>
*
* <img src="./img/concatAll.png" width="100%">
*
* Joins every Observable emitted by the source (a higher-order Observable), in
* a serial fashion. It subscribes to each inner Observable only after the
* previous inner Observable has completed, and merges all of their values into
* the returned observable.
*
* __Warning:__ If the source Observable emits Observables quickly and
* endlessly, and the inner Observables it emits generally complete slower than
* the source emits, you can run into memory issues as the incoming Observables
* collect in an unbounded buffer.
*
* Note: `concatAll` is equivalent to `mergeAll` with concurrency parameter set
* to `1`.
*
* @example <caption>For each click event, tick every second from 0 to 3, with no concurrency</caption>
* var clicks = Rx.Observable.fromEvent(document, 'click');
* var higherOrder = clicks.map(ev => Rx.Observable.interval(1000).take(4));
* var firstOrder = higherOrder.concatAll();
* firstOrder.subscribe(x => console.log(x));
*
* // Results in the following:
* // (results are not concurrent)
* // For every click on the "document" it will emit values 0 to 3 spaced
* // on a 1000ms interval
* // one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3
*
* @see {@link combineAll}
* @see {@link concat}
* @see {@link concatMap}
* @see {@link concatMapTo}
* @see {@link exhaust}
* @see {@link mergeAll}
* @see {@link switch}
* @see {@link zipAll}
*
* @return {Observable} An Observable emitting values from all the inner
* Observables concatenated.
* @method concatAll
* @owner Observable
*/
export function concatAll<T>(): MonoTypeOperatorFunction<T> {
return mergeAll(1);
}
Loading

0 comments on commit d7e8be7

Please sign in to comment.