Skip to content

Commit

Permalink
fix(takeUntil): unsubscribe notifier when it completes
Browse files Browse the repository at this point in the history
Fix operator takeUntil to automatically unsubscribe the notifier when it
completes. This is to conform RxJS Next with RxJS 4.

Somewhat related to issue ReactiveX#577.
  • Loading branch information
Andre Medeiros committed Oct 27, 2015
1 parent ba997c6 commit ec9505f
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/CoreOperators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export interface CoreOperators<T> {
switchMap?: <R>(project: ((x: T, ix: number) => Observable<any>), projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
switchMapTo?: <R>(observable: Observable<any>, projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
take?: (count: number) => Observable<T>;
takeUntil?: (observable: Observable<any>) => Observable<T>;
takeUntil?: (notifier: Observable<any>) => Observable<T>;
throttle?: (delay: number, scheduler?: Scheduler) => Observable<T>;
timeout?: <T>(due: number|Date, errorToSend?: any, scheduler?: Scheduler) => Observable<T>;
timeoutWith?: <T>(due: number|Date, withObservable: Observable<any>, scheduler?: Scheduler) => Observable<T>;
Expand Down
31 changes: 19 additions & 12 deletions src/operators/takeUntil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,48 @@ import Observer from '../Observer';
import Observable from '../Observable';
import Subscriber from '../Subscriber';

export default function takeUntil<T>(observable: Observable<any>) {
return this.lift(new TakeUntilOperator(observable));
export default function takeUntil<T>(notifier: Observable<any>) {
return this.lift(new TakeUntilOperator(notifier));
}

class TakeUntilOperator<T, R> implements Operator<T, R> {

observable: Observable<any>;

constructor(observable: Observable<any>) {
this.observable = observable;
constructor(private notifier: Observable<any>) {
}

call(subscriber: Subscriber<T>): Subscriber<T> {
return new TakeUntilSubscriber(subscriber, this.observable);
return new TakeUntilSubscriber(subscriber, this.notifier);
}
}

class TakeUntilSubscriber<T> extends Subscriber<T> {
private notificationSubscriber: TakeUntilInnerSubscriber<any> = null;

constructor(destination: Subscriber<T>,
observable: Observable<any>) {
private notifier: Observable<any>) {
super(destination);
this.add(observable._subscribe(new TakeUntilInnerSubscriber(destination)));
this.notificationSubscriber = new TakeUntilInnerSubscriber(destination);
this.add(notifier.subscribe(this.notificationSubscriber));
}

_complete() {
this.destination.complete();
this.notificationSubscriber.unsubscribe();
}
}

class TakeUntilInnerSubscriber<T> extends Subscriber<T> {
constructor(destination: Subscriber<T>) {
super(destination);
constructor(protected destination: Subscriber<T>) {
super(null);
}

_next() {
this.destination.complete();
}

_error(e) {
this.destination.error(e);
}

_complete() {
}
}

0 comments on commit ec9505f

Please sign in to comment.