Skip to content

Commit

Permalink
fix(mergeAll): use higher-order lettable version of mergeAll
Browse files Browse the repository at this point in the history
  • Loading branch information
jasonaden committed Sep 28, 2017
1 parent 60c96ab commit f0b703b
Showing 1 changed file with 2 additions and 57 deletions.
59 changes: 2 additions & 57 deletions src/operator/mergeAll.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import { Observable } from '../Observable';
import { Operator } from '../Operator';
import { Observer } from '../Observer';
import { Subscription } from '../Subscription';
import { OuterSubscriber } from '../OuterSubscriber';
import { Subscribable } from '../Observable';
import { subscribeToResult } from '../util/subscribeToResult';
import { mergeAll as higherOrder } from '../operators/mergeAll';

export function mergeAll<T>(this: Observable<T>, concurrent?: number): T;
export function mergeAll<T, R>(this: Observable<T>, concurrent?: number): Subscribable<R>;
Expand Down Expand Up @@ -54,56 +50,5 @@ export function mergeAll<T, R>(this: Observable<T>, concurrent?: number): Subscr
* @owner Observable
*/
export function mergeAll<T>(this: Observable<T>, concurrent: number = Number.POSITIVE_INFINITY): T {
return <any>this.lift<any>(new MergeAllOperator<T>(concurrent));
}

export class MergeAllOperator<T> implements Operator<Observable<T>, T> {
constructor(private concurrent: number) {
}

call(observer: Observer<T>, source: any): any {
return source.subscribe(new MergeAllSubscriber(observer, this.concurrent));
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
export class MergeAllSubscriber<T> extends OuterSubscriber<Observable<T>, T> {
private hasCompleted: boolean = false;
private buffer: Observable<T>[] = [];
private active: number = 0;

constructor(destination: Observer<T>, private concurrent: number) {
super(destination);
}

protected _next(observable: Observable<T>) {
if (this.active < this.concurrent) {
this.active++;
this.add(subscribeToResult<Observable<T>, T>(this, observable));
} else {
this.buffer.push(observable);
}
}

protected _complete() {
this.hasCompleted = true;
if (this.active === 0 && this.buffer.length === 0) {
this.destination.complete();
}
}

notifyComplete(innerSub: Subscription) {
const buffer = this.buffer;
this.remove(innerSub);
this.active--;
if (buffer.length > 0) {
this._next(buffer.shift());
} else if (this.active === 0 && this.hasCompleted) {
this.destination.complete();
}
}
return <any>higherOrder(concurrent)(this);
}

0 comments on commit f0b703b

Please sign in to comment.