-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(observeOn): add higher-order lettable version of observeOn
- Loading branch information
Showing
5 changed files
with
120 additions
and
63 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
import { Observable } from '../Observable'; | ||
import { IScheduler } from '../Scheduler'; | ||
import { Operator } from '../Operator'; | ||
import { PartialObserver } from '../Observer'; | ||
import { Subscriber } from '../Subscriber'; | ||
import { Notification } from '../Notification'; | ||
import { TeardownLogic } from '../Subscription'; | ||
import { Action } from '../scheduler/Action'; | ||
import { MonoTypeOperatorFunction } from '../interfaces'; | ||
|
||
/** | ||
* | ||
* Re-emits all notifications from source Observable with specified scheduler. | ||
* | ||
* <span class="informal">Ensure a specific scheduler is used, from outside of an Observable.</span> | ||
* | ||
* `observeOn` is an operator that accepts a scheduler as a first parameter, which will be used to reschedule | ||
* notifications emitted by the source Observable. It might be useful, if you do not have control over | ||
* internal scheduler of a given Observable, but want to control when its values are emitted nevertheless. | ||
* | ||
* Returned Observable emits the same notifications (nexted values, complete and error events) as the source Observable, | ||
* but rescheduled with provided scheduler. Note that this doesn't mean that source Observables internal | ||
* scheduler will be replaced in any way. Original scheduler still will be used, but when the source Observable emits | ||
* notification, it will be immediately scheduled again - this time with scheduler passed to `observeOn`. | ||
* An anti-pattern would be calling `observeOn` on Observable that emits lots of values synchronously, to split | ||
* that emissions into asynchronous chunks. For this to happen, scheduler would have to be passed into the source | ||
* Observable directly (usually into the operator that creates it). `observeOn` simply delays notifications a | ||
* little bit more, to ensure that they are emitted at expected moments. | ||
* | ||
* As a matter of fact, `observeOn` accepts second parameter, which specifies in milliseconds with what delay notifications | ||
* will be emitted. The main difference between {@link delay} operator and `observeOn` is that `observeOn` | ||
* will delay all notifications - including error notifications - while `delay` will pass through error | ||
* from source Observable immediately when it is emitted. In general it is highly recommended to use `delay` operator | ||
* for any kind of delaying of values in the stream, while using `observeOn` to specify which scheduler should be used | ||
* for notification emissions in general. | ||
* | ||
* @example <caption>Ensure values in subscribe are called just before browser repaint.</caption> | ||
* const intervals = Rx.Observable.interval(10); // Intervals are scheduled | ||
* // with async scheduler by default... | ||
* | ||
* intervals | ||
* .observeOn(Rx.Scheduler.animationFrame) // ...but we will observe on animationFrame | ||
* .subscribe(val => { // scheduler to ensure smooth animation. | ||
* someDiv.style.height = val + 'px'; | ||
* }); | ||
* | ||
* @see {@link delay} | ||
* | ||
* @param {IScheduler} scheduler Scheduler that will be used to reschedule notifications from source Observable. | ||
* @param {number} [delay] Number of milliseconds that states with what delay every notification should be rescheduled. | ||
* @return {Observable<T>} Observable that emits the same notifications as the source Observable, | ||
* but with provided scheduler. | ||
* | ||
* @method observeOn | ||
* @owner Observable | ||
*/ | ||
export function observeOn<T>(scheduler: IScheduler, delay: number = 0): MonoTypeOperatorFunction<T> { | ||
return function observeOnOperatorFunction(source: Observable<T>): Observable<T> { | ||
return source.lift(new ObserveOnOperator(scheduler, delay)); | ||
}; | ||
} | ||
|
||
export class ObserveOnOperator<T> implements Operator<T, T> { | ||
constructor(private scheduler: IScheduler, private delay: number = 0) { | ||
} | ||
|
||
call(subscriber: Subscriber<T>, source: any): TeardownLogic { | ||
return source.subscribe(new ObserveOnSubscriber(subscriber, this.scheduler, this.delay)); | ||
} | ||
} | ||
|
||
/** | ||
* We need this JSDoc comment for affecting ESDoc. | ||
* @ignore | ||
* @extends {Ignored} | ||
*/ | ||
export class ObserveOnSubscriber<T> extends Subscriber<T> { | ||
static dispatch(this: Action<ObserveOnMessage>, arg: ObserveOnMessage) { | ||
const { notification, destination } = arg; | ||
notification.observe(destination); | ||
this.unsubscribe(); | ||
} | ||
|
||
constructor(destination: Subscriber<T>, | ||
private scheduler: IScheduler, | ||
private delay: number = 0) { | ||
super(destination); | ||
} | ||
|
||
private scheduleMessage(notification: Notification<any>): void { | ||
this.add(this.scheduler.schedule( | ||
ObserveOnSubscriber.dispatch, | ||
this.delay, | ||
new ObserveOnMessage(notification, this.destination) | ||
)); | ||
} | ||
|
||
protected _next(value: T): void { | ||
this.scheduleMessage(Notification.createNext(value)); | ||
} | ||
|
||
protected _error(err: any): void { | ||
this.scheduleMessage(Notification.createError(err)); | ||
} | ||
|
||
protected _complete(): void { | ||
this.scheduleMessage(Notification.createComplete()); | ||
} | ||
} | ||
|
||
export class ObserveOnMessage { | ||
constructor(public notification: Notification<any>, | ||
public destination: PartialObserver<any>) { | ||
} | ||
} |