Skip to content

Commit

Permalink
refactor(switchLatest): update to use refactored flatMap
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Sep 15, 2015
1 parent 68ec153 commit 8cd757f
Showing 1 changed file with 13 additions and 33 deletions.
46 changes: 13 additions & 33 deletions src/operators/switchLatest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,50 +4,30 @@ import Observable from '../Observable';
import Subscriber from '../Subscriber';
import Subscription from '../Subscription';

import { FlatMapOperator, FlatMapSubscriber } from './flatMap-support';
import { FlatMapSubscriber } from './flatMap-support';

export default function switchLatest<T, R>(project: (x: T, ix: number) => Observable<any>,
projectResult?: (x: T, y: any, ix: number, iy: number) => R): Observable<R>{
return this.lift(new SwitchLatestOperator(project, projectResult));
export default function switchLatest<T, R, R2>(project: (value: T, index: number) => Observable<R>,
resultSelector?: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2): Observable<R>{
return this.lift(new SwitchLatestOperator(project, resultSelector));
}

class SwitchLatestOperator<T, R> extends FlatMapOperator<T, R> {

constructor(project: (x: T, ix: number) => Observable<any>,
projectResult?: (x: T, y: any, ix: number, iy: number) => R) {
super(project, projectResult, 1);
class SwitchLatestOperator<T, R, R2> implements Operator<T, R> {
constructor(private project: (value: T, index: number) => Observable<R>,
private resultSelector?: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2) {
}

call(subscriber: Subscriber<R>): Subscriber<T> {
return new SwitchLatestSubscriber(subscriber, this.project, this.projectResult);
return new SwitchLatestSubscriber(subscriber, this.project, this.resultSelector);
}
}

class SwitchLatestSubscriber<T, R> extends FlatMapSubscriber<T, R> {
class SwitchLatestSubscriber<T, R, R2> extends FlatMapSubscriber<T, R, R2> {

innerSubscription: Subscription<T>;

constructor(destination: Observer<R>,
project: (x: T, ix: number) => Observable<any>,
projectResult?: (x: T, y: any, ix: number, iy: number) => R) {
super(destination, 1, project, projectResult);
}

_buffer(value) {
const active = this.active;
if(active > 0) {
this.active = active - 1;
const inner = this.innerSubscription;
if(inner) {
inner.unsubscribe()
}
}
this._next(value);
}

_subscribeInner(observable, value, index) {
this.innerSubscription = new Subscription();
this.innerSubscription.add(super._subscribeInner(observable, value, index));
return this.innerSubscription;
constructor(destination: Observer<T>,
project: (value: T, index: number) => Observable<R>,
resultSelector?: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2) {
super(destination, project, resultSelector, 1);
}
}

0 comments on commit 8cd757f

Please sign in to comment.