Skip to content

Commit

Permalink
feat(observable): add Observable.all (forkJoin)
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Aug 31, 2015
1 parent f64b81b commit 44a4ee1
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 1 deletion.
14 changes: 14 additions & 0 deletions spec/observables/forkJoin-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.forkJoin', function () {
it('should join the last values of the provided observables into an array', function(done) {
Observable.forkJoin(Observable.of(1, 2, 3, 'a'),
Observable.of('b'),
Observable.of(1, 2, 3, 4, 'c'))
.subscribe(function (x) {
expect(x).toEqual(['a', 'b', 'c']);
}, null, done);
});
});
3 changes: 2 additions & 1 deletion src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ export default class Observable<T> {
static fromPromise: <T>(promise: Promise<T>) => Observable<T>;
static timer: (delay: number) => Observable<number>;
static interval: (interval: number) => Observable<number>;

static forkJoin: (...observables: Observable<any>[]) => Observable<any[]>;

static concat: (...observables: any[]) => Observable<any>;
concat: (...observables: any[]) => Observable<any>;
concatAll: () => Observable<any>;
Expand Down
2 changes: 2 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import ScalarObservable from './observables/ScalarObservable';
import TimerObservable from './observables/TimerObservable';
import FromEventPatternObservable from './observables/FromEventPatternObservable';
import FromEventObservable from './observables/FromEventObservable';
import ForkJoinObservable from './observables/ForkJoinObservable';

Observable.defer = DeferObservable.create;
Observable.from = IteratorObservable.create;
Expand All @@ -30,6 +31,7 @@ Observable.fromPromise = PromiseObservable.create;
Observable.of = ArrayObservable.of;
Observable.range = RangeObservable.create;
Observable.fromEventPattern = FromEventPatternObservable.create;
Observable.forkJoin = ForkJoinObservable.create;

Observable.just = ScalarObservable.create;
Observable.return = ScalarObservable.create;
Expand Down
56 changes: 56 additions & 0 deletions src/observables/ForkJoinObservable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import Observable from '../Observable';
import Observer from '../Observer';
import Subscriber from '../Subscriber';

export default class ForkJoinObservable<T> extends Observable<T> {
constructor(private observables: Observable<any>[]) {
super();
}

static create<R>(...observables: Observable<any>[]): Observable<R> {
return new ForkJoinObservable(observables);
}

_subscribe(subscriber: Observer<any>) {
const observables = this.observables;
const len = observables.length;
let context = { complete: 0, total: len, values: emptyArray(len) };
for (let i = 0; i < len; i++) {
observables[i].subscribe(new AllSubscriber(subscriber, this, i, context))
}
}
}

class AllSubscriber<T> extends Subscriber<T> {
private _value: T;

constructor(destination: Observer<T>, private parent: ForkJoinObservable<T>, private index: number,
private context: { complete: number, total: number, values: any[] }) {
super(destination);
}

_next(value: T) {
this._value = value;
}

_complete() {
const context = this.context;
context.values[this.index] = this._value;
if (context.values.every(hasValue)) {
this.destination.next(context.values);
this.destination.complete();
}
}
}

function hasValue(x) {
return x !== null;
}

function emptyArray(len: number): any[] {
var arr = [];
for (let i = 0; i < len; i++) {
arr.push(null);
}
return arr;
}

0 comments on commit 44a4ee1

Please sign in to comment.