diff --git a/src/operator/subscribeOn.ts b/src/operator/subscribeOn.ts index f68b7a644a..ac0ec35fb4 100644 --- a/src/operator/subscribeOn.ts +++ b/src/operator/subscribeOn.ts @@ -1,9 +1,7 @@ -import { Operator } from '../Operator'; + import { IScheduler } from '../Scheduler'; -import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; -import { TeardownLogic } from '../Subscription'; -import { SubscribeOnObservable } from '../observable/SubscribeOnObservable'; +import { subscribeOn as higherOrder } from '../operators'; /** * Asynchronously subscribes Observers to this Observable on the specified IScheduler. @@ -17,16 +15,5 @@ import { SubscribeOnObservable } from '../observable/SubscribeOnObservable'; * @owner Observable */ export function subscribeOn(this: Observable, scheduler: IScheduler, delay: number = 0): Observable { - return this.lift(new SubscribeOnOperator(scheduler, delay)); -} - -class SubscribeOnOperator implements Operator { - constructor(private scheduler: IScheduler, - private delay: number) { - } - call(subscriber: Subscriber, source: any): TeardownLogic { - return new SubscribeOnObservable( - source, this.delay, this.scheduler - ).subscribe(subscriber); - } + return higherOrder(scheduler, delay)(this); } diff --git a/src/operators/index.ts b/src/operators/index.ts index e8be79d1a4..9837244c51 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -26,6 +26,7 @@ export { race } from './race'; export { reduce } from './reduce'; export { refCount } from './refCount'; export { scan } from './scan'; +export { subscribeOn } from './subscribeOn'; export { switchAll } from './switchAll'; export { switchMap } from './switchMap'; export { takeLast } from './takeLast'; diff --git a/src/operators/subscribeOn.ts b/src/operators/subscribeOn.ts new file mode 100644 index 0000000000..b6d2146737 --- /dev/null +++ b/src/operators/subscribeOn.ts @@ -0,0 +1,35 @@ +import { Operator } from '../Operator'; +import { IScheduler } from '../Scheduler'; +import { Subscriber } from '../Subscriber'; +import { Observable } from '../Observable'; +import { TeardownLogic } from '../Subscription'; +import { SubscribeOnObservable } from '../observable/SubscribeOnObservable'; +import { MonoTypeOperatorFunction } from '../interfaces'; + +/** + * Asynchronously subscribes Observers to this Observable on the specified IScheduler. + * + * + * + * @param {Scheduler} scheduler - The IScheduler to perform subscription actions on. + * @return {Observable} The source Observable modified so that its subscriptions happen on the specified IScheduler. + . + * @method subscribeOn + * @owner Observable + */ +export function subscribeOn(scheduler: IScheduler, delay: number = 0): MonoTypeOperatorFunction { + return function subscribeOnOperatorFunction(source: Observable): Observable { + return source.lift(new SubscribeOnOperator(scheduler, delay)); + }; +} + +class SubscribeOnOperator implements Operator { + constructor(private scheduler: IScheduler, + private delay: number) { + } + call(subscriber: Subscriber, source: any): TeardownLogic { + return new SubscribeOnObservable( + source, this.delay, this.scheduler + ).subscribe(subscriber); + } +}