-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(sample): add higher-order lettable version of sample
- Loading branch information
Showing
3 changed files
with
94 additions
and
51 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
import { Operator } from '../Operator'; | ||
import { Observable } from '../Observable'; | ||
import { Subscriber } from '../Subscriber'; | ||
import { TeardownLogic } from '../Subscription'; | ||
import { OuterSubscriber } from '../OuterSubscriber'; | ||
import { InnerSubscriber } from '../InnerSubscriber'; | ||
import { subscribeToResult } from '../util/subscribeToResult'; | ||
|
||
import { MonoTypeOperatorFunction } from '../interfaces'; | ||
|
||
/** | ||
* Emits the most recently emitted value from the source Observable whenever | ||
* another Observable, the `notifier`, emits. | ||
* | ||
* <span class="informal">It's like {@link sampleTime}, but samples whenever | ||
* the `notifier` Observable emits something.</span> | ||
* | ||
* <img src="./img/sample.png" width="100%"> | ||
* | ||
* Whenever the `notifier` Observable emits a value or completes, `sample` | ||
* looks at the source Observable and emits whichever value it has most recently | ||
* emitted since the previous sampling, unless the source has not emitted | ||
* anything since the previous sampling. The `notifier` is subscribed to as soon | ||
* as the output Observable is subscribed. | ||
* | ||
* @example <caption>On every click, sample the most recent "seconds" timer</caption> | ||
* var seconds = Rx.Observable.interval(1000); | ||
* var clicks = Rx.Observable.fromEvent(document, 'click'); | ||
* var result = seconds.sample(clicks); | ||
* result.subscribe(x => console.log(x)); | ||
* | ||
* @see {@link audit} | ||
* @see {@link debounce} | ||
* @see {@link sampleTime} | ||
* @see {@link throttle} | ||
* | ||
* @param {Observable<any>} notifier The Observable to use for sampling the | ||
* source Observable. | ||
* @return {Observable<T>} An Observable that emits the results of sampling the | ||
* values emitted by the source Observable whenever the notifier Observable | ||
* emits value or completes. | ||
* @method sample | ||
* @owner Observable | ||
*/ | ||
export function sample<T>(notifier: Observable<any>): MonoTypeOperatorFunction<T> { | ||
return (source: Observable<T>) => source.lift(new SampleOperator(notifier)); | ||
} | ||
|
||
class SampleOperator<T> implements Operator<T, T> { | ||
constructor(private notifier: Observable<any>) { | ||
} | ||
|
||
call(subscriber: Subscriber<T>, source: any): TeardownLogic { | ||
const sampleSubscriber = new SampleSubscriber(subscriber); | ||
const subscription = source.subscribe(sampleSubscriber); | ||
subscription.add(subscribeToResult(sampleSubscriber, this.notifier)); | ||
return subscription; | ||
} | ||
} | ||
|
||
/** | ||
* We need this JSDoc comment for affecting ESDoc. | ||
* @ignore | ||
* @extends {Ignored} | ||
*/ | ||
class SampleSubscriber<T, R> extends OuterSubscriber<T, R> { | ||
private value: T; | ||
private hasValue: boolean = false; | ||
|
||
protected _next(value: T) { | ||
this.value = value; | ||
this.hasValue = true; | ||
} | ||
|
||
notifyNext(outerValue: T, innerValue: R, | ||
outerIndex: number, innerIndex: number, | ||
innerSub: InnerSubscriber<T, R>): void { | ||
this.emitValue(); | ||
} | ||
|
||
notifyComplete(): void { | ||
this.emitValue(); | ||
} | ||
|
||
emitValue() { | ||
if (this.hasValue) { | ||
this.hasValue = false; | ||
this.destination.next(this.value); | ||
} | ||
} | ||
} |