-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(windowWhen): add higher-order lettable version of windowWhen
- Loading branch information
Showing
3 changed files
with
144 additions
and
97 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,140 @@ | ||
import { Operator } from '../Operator'; | ||
import { Subscriber } from '../Subscriber'; | ||
import { Observable } from '../Observable'; | ||
import { Subject } from '../Subject'; | ||
import { Subscription } from '../Subscription'; | ||
import { tryCatch } from '../util/tryCatch'; | ||
import { errorObject } from '../util/errorObject'; | ||
import { OuterSubscriber } from '../OuterSubscriber'; | ||
import { InnerSubscriber } from '../InnerSubscriber'; | ||
import { subscribeToResult } from '../util/subscribeToResult'; | ||
import { OperatorFunction } from '../interfaces'; | ||
|
||
/** | ||
* Branch out the source Observable values as a nested Observable using a | ||
* factory function of closing Observables to determine when to start a new | ||
* window. | ||
* | ||
* <span class="informal">It's like {@link bufferWhen}, but emits a nested | ||
* Observable instead of an array.</span> | ||
* | ||
* <img src="./img/windowWhen.png" width="100%"> | ||
* | ||
* Returns an Observable that emits windows of items it collects from the source | ||
* Observable. The output Observable emits connected, non-overlapping windows. | ||
* It emits the current window and opens a new one whenever the Observable | ||
* produced by the specified `closingSelector` function emits an item. The first | ||
* window is opened immediately when subscribing to the output Observable. | ||
* | ||
* @example <caption>Emit only the first two clicks events in every window of [1-5] random seconds</caption> | ||
* var clicks = Rx.Observable.fromEvent(document, 'click'); | ||
* var result = clicks | ||
* .windowWhen(() => Rx.Observable.interval(1000 + Math.random() * 4000)) | ||
* .map(win => win.take(2)) // each window has at most 2 emissions | ||
* .mergeAll(); // flatten the Observable-of-Observables | ||
* result.subscribe(x => console.log(x)); | ||
* | ||
* @see {@link window} | ||
* @see {@link windowCount} | ||
* @see {@link windowTime} | ||
* @see {@link windowToggle} | ||
* @see {@link bufferWhen} | ||
* | ||
* @param {function(): Observable} closingSelector A function that takes no | ||
* arguments and returns an Observable that signals (on either `next` or | ||
* `complete`) when to close the previous window and start a new one. | ||
* @return {Observable<Observable<T>>} An observable of windows, which in turn | ||
* are Observables. | ||
* @method windowWhen | ||
* @owner Observable | ||
*/ | ||
export function windowWhen<T>(closingSelector: () => Observable<any>): OperatorFunction<T, Observable<T>> { | ||
return function windowWhenOperatorFunction(source: Observable<T>) { | ||
return source.lift(new WindowOperator<T>(closingSelector)); | ||
}; | ||
} | ||
|
||
class WindowOperator<T> implements Operator<T, Observable<T>> { | ||
constructor(private closingSelector: () => Observable<any>) { | ||
} | ||
|
||
call(subscriber: Subscriber<Observable<T>>, source: any): any { | ||
return source.subscribe(new WindowSubscriber(subscriber, this.closingSelector)); | ||
} | ||
} | ||
|
||
/** | ||
* We need this JSDoc comment for affecting ESDoc. | ||
* @ignore | ||
* @extends {Ignored} | ||
*/ | ||
class WindowSubscriber<T> extends OuterSubscriber<T, any> { | ||
private window: Subject<T>; | ||
private closingNotification: Subscription; | ||
|
||
constructor(protected destination: Subscriber<Observable<T>>, | ||
private closingSelector: () => Observable<any>) { | ||
super(destination); | ||
this.openWindow(); | ||
} | ||
|
||
notifyNext(outerValue: T, innerValue: any, | ||
outerIndex: number, innerIndex: number, | ||
innerSub: InnerSubscriber<T, any>): void { | ||
this.openWindow(innerSub); | ||
} | ||
|
||
notifyError(error: any, innerSub: InnerSubscriber<T, any>): void { | ||
this._error(error); | ||
} | ||
|
||
notifyComplete(innerSub: InnerSubscriber<T, any>): void { | ||
this.openWindow(innerSub); | ||
} | ||
|
||
protected _next(value: T): void { | ||
this.window.next(value); | ||
} | ||
|
||
protected _error(err: any): void { | ||
this.window.error(err); | ||
this.destination.error(err); | ||
this.unsubscribeClosingNotification(); | ||
} | ||
|
||
protected _complete(): void { | ||
this.window.complete(); | ||
this.destination.complete(); | ||
this.unsubscribeClosingNotification(); | ||
} | ||
|
||
private unsubscribeClosingNotification(): void { | ||
if (this.closingNotification) { | ||
this.closingNotification.unsubscribe(); | ||
} | ||
} | ||
|
||
private openWindow(innerSub: InnerSubscriber<T, any> = null): void { | ||
if (innerSub) { | ||
this.remove(innerSub); | ||
innerSub.unsubscribe(); | ||
} | ||
|
||
const prevWindow = this.window; | ||
if (prevWindow) { | ||
prevWindow.complete(); | ||
} | ||
|
||
const window = this.window = new Subject<T>(); | ||
this.destination.next(window); | ||
|
||
const closingNotifier = tryCatch(this.closingSelector)(); | ||
if (closingNotifier === errorObject) { | ||
const err = errorObject.e; | ||
this.destination.error(err); | ||
this.window.error(err); | ||
} else { | ||
this.add(this.closingNotification = subscribeToResult(this, closingNotifier)); | ||
} | ||
} | ||
} |