-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(skipUntil): add higher-order lettable version of skipUntil
- Loading branch information
Showing
3 changed files
with
80 additions
and
60 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
import { Operator } from '../Operator'; | ||
import { Subscriber } from '../Subscriber'; | ||
import { Observable } from '../Observable'; | ||
import { TeardownLogic } from '../Subscription'; | ||
import { OuterSubscriber } from '../OuterSubscriber'; | ||
import { InnerSubscriber } from '../InnerSubscriber'; | ||
import { subscribeToResult } from '../util/subscribeToResult'; | ||
import { MonoTypeOperatorFunction } from '../interfaces'; | ||
|
||
/** | ||
* Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item. | ||
* | ||
* <img src="./img/skipUntil.png" width="100%"> | ||
* | ||
* @param {Observable} notifier - The second Observable that has to emit an item before the source Observable's elements begin to | ||
* be mirrored by the resulting Observable. | ||
* @return {Observable<T>} An Observable that skips items from the source Observable until the second Observable emits | ||
* an item, then emits the remaining items. | ||
* @method skipUntil | ||
* @owner Observable | ||
*/ | ||
export function skipUntil<T>(notifier: Observable<any>): MonoTypeOperatorFunction<T> { | ||
return (source: Observable<T>) => source.lift(new SkipUntilOperator(notifier)); | ||
} | ||
|
||
class SkipUntilOperator<T> implements Operator<T, T> { | ||
constructor(private notifier: Observable<any>) { | ||
} | ||
|
||
call(subscriber: Subscriber<T>, source: any): TeardownLogic { | ||
return source.subscribe(new SkipUntilSubscriber(subscriber, this.notifier)); | ||
} | ||
} | ||
|
||
/** | ||
* We need this JSDoc comment for affecting ESDoc. | ||
* @ignore | ||
* @extends {Ignored} | ||
*/ | ||
class SkipUntilSubscriber<T, R> extends OuterSubscriber<T, R> { | ||
|
||
private hasValue: boolean = false; | ||
private isInnerStopped: boolean = false; | ||
|
||
constructor(destination: Subscriber<any>, | ||
notifier: Observable<any>) { | ||
super(destination); | ||
this.add(subscribeToResult(this, notifier)); | ||
} | ||
|
||
protected _next(value: T) { | ||
if (this.hasValue) { | ||
super._next(value); | ||
} | ||
} | ||
|
||
protected _complete() { | ||
if (this.isInnerStopped) { | ||
super._complete(); | ||
} else { | ||
this.unsubscribe(); | ||
} | ||
} | ||
|
||
notifyNext(outerValue: T, innerValue: R, | ||
outerIndex: number, innerIndex: number, | ||
innerSub: InnerSubscriber<T, R>): void { | ||
this.hasValue = true; | ||
} | ||
|
||
notifyComplete(): void { | ||
this.isInnerStopped = true; | ||
if (this.isStopped) { | ||
super._complete(); | ||
} | ||
} | ||
} |