diff --git a/src/operator/throttleTime.ts b/src/operator/throttleTime.ts index cb6b7ca1e7..5175e824cc 100644 --- a/src/operator/throttleTime.ts +++ b/src/operator/throttleTime.ts @@ -1,10 +1,8 @@ -import { Operator } from '../Operator'; -import { Subscriber } from '../Subscriber'; +import { Observable } from '../Observable'; import { IScheduler } from '../Scheduler'; -import { Subscription, TeardownLogic } from '../Subscription'; import { async } from '../scheduler/async'; -import { Observable } from '../Observable'; -import { ThrottleConfig, defaultThrottleConfig } from './throttle'; +import { ThrottleConfig, defaultThrottleConfig } from '../operators/throttle'; +import { throttleTime as higherOrder } from '../operators/throttleTime'; /** * Emits a value from the source Observable, then ignores subsequent source @@ -49,75 +47,5 @@ export function throttleTime(this: Observable, duration: number, scheduler: IScheduler = async, config: ThrottleConfig = defaultThrottleConfig): Observable { - return this.lift(new ThrottleTimeOperator(duration, scheduler, config.leading, config.trailing)); -} - -class ThrottleTimeOperator implements Operator { - constructor(private duration: number, - private scheduler: IScheduler, - private leading: boolean, - private trailing: boolean) { - } - - call(subscriber: Subscriber, source: any): TeardownLogic { - return source.subscribe( - new ThrottleTimeSubscriber(subscriber, this.duration, this.scheduler, this.leading, this.trailing) - ); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class ThrottleTimeSubscriber extends Subscriber { - private throttled: Subscription; - private _hasTrailingValue: boolean = false; - private _trailingValue: T = null; - - constructor(destination: Subscriber, - private duration: number, - private scheduler: IScheduler, - private leading: boolean, - private trailing: boolean) { - super(destination); - } - - protected _next(value: T) { - if (this.throttled) { - if (this.trailing) { - this._trailingValue = value; - this._hasTrailingValue = true; - } - } else { - this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, { subscriber: this })); - if (this.leading) { - this.destination.next(value); - } - } - } - - clearThrottle() { - const throttled = this.throttled; - if (throttled) { - if (this.trailing && this._hasTrailingValue) { - this.destination.next(this._trailingValue); - this._trailingValue = null; - this._hasTrailingValue = false; - } - throttled.unsubscribe(); - this.remove(throttled); - this.throttled = null; - } - } -} - -interface DispatchArg { - subscriber: ThrottleTimeSubscriber; -} - -function dispatchNext(arg: DispatchArg) { - const { subscriber } = arg; - subscriber.clearThrottle(); + return higherOrder(duration, scheduler, config)(this); } diff --git a/src/operators/index.ts b/src/operators/index.ts index e39d5b6424..b0fd4a0502 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -66,6 +66,7 @@ export { switchMap } from './switchMap'; export { takeLast } from './takeLast'; export { tap } from './tap'; export { throttle } from './throttle'; +export { throttleTime } from './throttleTime'; export { timestamp } from './timestamp'; export { toArray } from './toArray'; export { window } from './window'; diff --git a/src/operators/throttleTime.ts b/src/operators/throttleTime.ts new file mode 100644 index 0000000000..d81920a38d --- /dev/null +++ b/src/operators/throttleTime.ts @@ -0,0 +1,123 @@ +import { Operator } from '../Operator'; +import { Subscriber } from '../Subscriber'; +import { IScheduler } from '../Scheduler'; +import { Subscription, TeardownLogic } from '../Subscription'; +import { async } from '../scheduler/async'; +import { Observable } from '../Observable'; +import { ThrottleConfig, defaultThrottleConfig } from './throttle'; +import { MonoTypeOperatorFunction } from '../interfaces'; + +/** + * Emits a value from the source Observable, then ignores subsequent source + * values for `duration` milliseconds, then repeats this process. + * + * Lets a value pass, then ignores source values for the + * next `duration` milliseconds. + * + * + * + * `throttleTime` emits the source Observable values on the output Observable + * when its internal timer is disabled, and ignores source values when the timer + * is enabled. Initially, the timer is disabled. As soon as the first source + * value arrives, it is forwarded to the output Observable, and then the timer + * is enabled. After `duration` milliseconds (or the time unit determined + * internally by the optional `scheduler`) has passed, the timer is disabled, + * and this process repeats for the next source value. Optionally takes a + * {@link IScheduler} for managing timers. + * + * @example Emit clicks at a rate of at most one click per second + * var clicks = Rx.Observable.fromEvent(document, 'click'); + * var result = clicks.throttleTime(1000); + * result.subscribe(x => console.log(x)); + * + * @see {@link auditTime} + * @see {@link debounceTime} + * @see {@link delay} + * @see {@link sampleTime} + * @see {@link throttle} + * + * @param {number} duration Time to wait before emitting another value after + * emitting the last value, measured in milliseconds or the time unit determined + * internally by the optional `scheduler`. + * @param {Scheduler} [scheduler=async] The {@link IScheduler} to use for + * managing the timers that handle the throttling. + * @return {Observable} An Observable that performs the throttle operation to + * limit the rate of emissions from the source. + * @method throttleTime + * @owner Observable + */ +export function throttleTime(duration: number, + scheduler: IScheduler = async, + config: ThrottleConfig = defaultThrottleConfig): MonoTypeOperatorFunction { + return (source: Observable) => source.lift(new ThrottleTimeOperator(duration, scheduler, config.leading, config.trailing)); +} + +class ThrottleTimeOperator implements Operator { + constructor(private duration: number, + private scheduler: IScheduler, + private leading: boolean, + private trailing: boolean) { + } + + call(subscriber: Subscriber, source: any): TeardownLogic { + return source.subscribe( + new ThrottleTimeSubscriber(subscriber, this.duration, this.scheduler, this.leading, this.trailing) + ); + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +class ThrottleTimeSubscriber extends Subscriber { + private throttled: Subscription; + private _hasTrailingValue: boolean = false; + private _trailingValue: T = null; + + constructor(destination: Subscriber, + private duration: number, + private scheduler: IScheduler, + private leading: boolean, + private trailing: boolean) { + super(destination); + } + + protected _next(value: T) { + if (this.throttled) { + if (this.trailing) { + this._trailingValue = value; + this._hasTrailingValue = true; + } + } else { + this.add(this.throttled = this.scheduler.schedule(dispatchNext, this.duration, { subscriber: this })); + if (this.leading) { + this.destination.next(value); + } + } + } + + clearThrottle() { + const throttled = this.throttled; + if (throttled) { + if (this.trailing && this._hasTrailingValue) { + this.destination.next(this._trailingValue); + this._trailingValue = null; + this._hasTrailingValue = false; + } + throttled.unsubscribe(); + this.remove(throttled); + this.throttled = null; + } + } +} + +interface DispatchArg { + subscriber: ThrottleTimeSubscriber; +} + +function dispatchNext(arg: DispatchArg) { + const { subscriber } = arg; + subscriber.clearThrottle(); +}