Skip to content

Commit

Permalink
feat(pipe): add pipe method ot Observable
Browse files Browse the repository at this point in the history
Also adds type overloads for pipe and for compose

NOTE: For some reason TypeScript would not let me call compose(...operations) in the pipe method
  • Loading branch information
benlesh committed Aug 10, 2017
1 parent 5281229 commit 9f6312d
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 0 deletions.
29 changes: 29 additions & 0 deletions spec/Observable-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as sinon from 'sinon';
import * as Rx from '../dist/cjs/Rx';
import {TeardownLogic} from '../dist/cjs/Subscription';
import marbleTestingSignature = require('./helpers/marble-testing'); // tslint:disable-line:no-require-imports
import { map } from '../dist/cjs/operators';

declare const { asDiagram, rxTestScheduler };
declare const cold: typeof marbleTestingSignature.cold;
Expand Down Expand Up @@ -621,6 +622,34 @@ describe('Observable', () => {
});
});
});

describe('pipe', () => {
it('should exist', () => {
const source = Observable.of('test');
expect(source.pipe).to.be.a('function');
});

it('should pipe multiple operations', (done) => {
Observable.of('test')
.pipe(
map((x: string) => x + x),
map((x: string) => x + '!!!')
)
.subscribe(
x => {
expect(x).to.equal('testtest!!!');
},
null,
done
);
});

it('should return the same observable if there are no arguments', () => {
const source = Observable.of('test');
const result = source.pipe();
expect(result).to.equal(source);
});
});
});

/** @test {Observable} */
Expand Down
41 changes: 41 additions & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import { toSubscriber } from './util/toSubscriber';
import { IfObservable } from './observable/IfObservable';
import { ErrorObservable } from './observable/ErrorObservable';
import { observable as Symbol_observable } from './symbol/observable';
import { OperatorFunction } from './interfaces';
import { compose } from './util/compose';

export interface Subscribable<T> {
subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
Expand Down Expand Up @@ -286,4 +288,43 @@ export class Observable<T> implements Subscribable<T> {
[Symbol_observable]() {
return this;
}

/* tslint:disable:max-line-length */
pipe(): Observable<T>
pipe<A>(op1: OperatorFunction<T, A>): Observable<A>
pipe<A, B>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>): Observable<B>
pipe<A, B, C>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>): Observable<C>
pipe<A, B, C, D>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>): Observable<D>
pipe<A, B, C, D, E>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>): Observable<E>
pipe<A, B, C, D, E, F>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>): Observable<F>
pipe<A, B, C, D, E, F, G>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G>): Observable<G>
pipe<A, B, C, D, E, F, G, H>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G>, op8: OperatorFunction<G, H>): Observable<H>
pipe<A, B, C, D, E, F, G, H, I>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G>, op8: OperatorFunction<G, H>, op9: OperatorFunction<H, I>): Observable<I>
/* tslint:enable:max-line-length */

/**
* Used to stitch together functional operators into a chain.
* @method pipe
* @return {Observable} the Observable result of all of the operators having
* been called in the order they were passed in.
*
* @example
*
* import { map, filter, scan } from 'rxjs/operators';
*
* Rx.Observable.interval(1000)
* .pipe(
* filter(x => x % 2 === 0),
* map(x => x + x),
* scan((acc, x) => acc + x)
* )
* .subscribe(x => console.log(x))
*/
pipe<R>(...operations: OperatorFunction<T, R>[]): Observable<R> {
if (operations.length === 0) {
return this as any;
}

return compose.apply(this, operations)(this);
}
}
13 changes: 13 additions & 0 deletions src/util/compose.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
import { noop } from './noop';
import { UnaryFunction } from '../interfaces';

/* tslint:disable:max-line-length */
export function compose<T>(): UnaryFunction<T, T>;
export function compose<T, A>(op1: UnaryFunction<T, A>): UnaryFunction<T, A>;
export function compose<T, A, B>(op1: UnaryFunction<T, A>, op2: UnaryFunction<A, B>): UnaryFunction<T, B>;
export function compose<T, A, B, C>(op1: UnaryFunction<T, A>, op2: UnaryFunction<A, B>, op3: UnaryFunction<B, C>): UnaryFunction<T, C>;
export function compose<T, A, B, C, D>(op1: UnaryFunction<T, A>, op2: UnaryFunction<A, B>, op3: UnaryFunction<B, C>, op4: UnaryFunction<C, D>): UnaryFunction<T, D>;
export function compose<T, A, B, C, D, E>(op1: UnaryFunction<T, A>, op2: UnaryFunction<A, B>, op3: UnaryFunction<B, C>, op4: UnaryFunction<C, D>, op5: UnaryFunction<D, E>): UnaryFunction<T, E>;
export function compose<T, A, B, C, D, E, F>(op1: UnaryFunction<T, A>, op2: UnaryFunction<A, B>, op3: UnaryFunction<B, C>, op4: UnaryFunction<C, D>, op5: UnaryFunction<D, E>, op6: UnaryFunction<E, F>): UnaryFunction<T, F>;
export function compose<T, A, B, C, D, E, F, G>(op1: UnaryFunction<T, A>, op2: UnaryFunction<A, B>, op3: UnaryFunction<B, C>, op4: UnaryFunction<C, D>, op5: UnaryFunction<D, E>, op6: UnaryFunction<E, F>, op7: UnaryFunction<F, G>): UnaryFunction<T, G>;
export function compose<T, A, B, C, D, E, F, G, H>(op1: UnaryFunction<T, A>, op2: UnaryFunction<A, B>, op3: UnaryFunction<B, C>, op4: UnaryFunction<C, D>, op5: UnaryFunction<D, E>, op6: UnaryFunction<E, F>, op7: UnaryFunction<F, G>, op8: UnaryFunction<G, H>): UnaryFunction<T, H>;
export function compose<T, A, B, C, D, E, F, G, H, I>(op1: UnaryFunction<T, A>, op2: UnaryFunction<A, B>, op3: UnaryFunction<B, C>, op4: UnaryFunction<C, D>, op5: UnaryFunction<D, E>, op6: UnaryFunction<E, F>, op7: UnaryFunction<F, G>, op8: UnaryFunction<G, H>, op9: UnaryFunction<H, I>): UnaryFunction<T, I>;
/* tslint:enable:max-line-length */

export function compose<T, R>(...fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {
if (!fns) {
return noop as UnaryFunction<any, any>;
Expand Down

0 comments on commit 9f6312d

Please sign in to comment.