Skip to content

Commit e2daefe

Browse files
committed
feat(audit): add higher-order lettable version of audit
1 parent 1304e85 commit e2daefe

File tree

3 files changed

+126
-75
lines changed

3 files changed

+126
-75
lines changed

src/operator/audit.ts

Lines changed: 3 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,6 @@
1-
import { Operator } from '../Operator';
2-
import { Subscriber } from '../Subscriber';
3-
import { Observable, SubscribableOrPromise } from '../Observable';
4-
import { Subscription, TeardownLogic } from '../Subscription';
51

6-
import { tryCatch } from '../util/tryCatch';
7-
import { errorObject } from '../util/errorObject';
8-
import { OuterSubscriber } from '../OuterSubscriber';
9-
import { subscribeToResult } from '../util/subscribeToResult';
2+
import { Observable, SubscribableOrPromise } from '../Observable';
3+
import { audit as higherOrder } from '../operators';
104

115
/**
126
* Ignores source values for a duration determined by another Observable, then
@@ -49,71 +43,5 @@ import { subscribeToResult } from '../util/subscribeToResult';
4943
* @owner Observable
5044
*/
5145
export function audit<T>(this: Observable<T>, durationSelector: (value: T) => SubscribableOrPromise<any>): Observable<T> {
52-
return this.lift(new AuditOperator(durationSelector));
53-
}
54-
55-
class AuditOperator<T> implements Operator<T, T> {
56-
constructor(private durationSelector: (value: T) => SubscribableOrPromise<any>) {
57-
}
58-
59-
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
60-
return source.subscribe(new AuditSubscriber<T, T>(subscriber, this.durationSelector));
61-
}
62-
}
63-
64-
/**
65-
* We need this JSDoc comment for affecting ESDoc.
66-
* @ignore
67-
* @extends {Ignored}
68-
*/
69-
class AuditSubscriber<T, R> extends OuterSubscriber<T, R> {
70-
71-
private value: T;
72-
private hasValue: boolean = false;
73-
private throttled: Subscription;
74-
75-
constructor(destination: Subscriber<T>,
76-
private durationSelector: (value: T) => SubscribableOrPromise<any>) {
77-
super(destination);
78-
}
79-
80-
protected _next(value: T): void {
81-
this.value = value;
82-
this.hasValue = true;
83-
if (!this.throttled) {
84-
const duration = tryCatch(this.durationSelector)(value);
85-
if (duration === errorObject) {
86-
this.destination.error(errorObject.e);
87-
} else {
88-
const innerSubscription = subscribeToResult(this, duration);
89-
if (innerSubscription.closed) {
90-
this.clearThrottle();
91-
} else {
92-
this.add(this.throttled = innerSubscription);
93-
}
94-
}
95-
}
96-
}
97-
98-
clearThrottle() {
99-
const { value, hasValue, throttled } = this;
100-
if (throttled) {
101-
this.remove(throttled);
102-
this.throttled = null;
103-
throttled.unsubscribe();
104-
}
105-
if (hasValue) {
106-
this.value = null;
107-
this.hasValue = false;
108-
this.destination.next(value);
109-
}
110-
}
111-
112-
notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void {
113-
this.clearThrottle();
114-
}
115-
116-
notifyComplete(): void {
117-
this.clearThrottle();
118-
}
46+
return higherOrder(durationSelector)(this);
11947
}

src/operators/audit.ts

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import { Operator } from '../Operator';
2+
import { Subscriber } from '../Subscriber';
3+
import { Observable, SubscribableOrPromise } from '../Observable';
4+
import { Subscription, TeardownLogic } from '../Subscription';
5+
6+
import { tryCatch } from '../util/tryCatch';
7+
import { errorObject } from '../util/errorObject';
8+
import { OuterSubscriber } from '../OuterSubscriber';
9+
import { subscribeToResult } from '../util/subscribeToResult';
10+
import { MonoTypeOperatorFunction } from '../interfaces';
11+
12+
/**
13+
* Ignores source values for a duration determined by another Observable, then
14+
* emits the most recent value from the source Observable, then repeats this
15+
* process.
16+
*
17+
* <span class="informal">It's like {@link auditTime}, but the silencing
18+
* duration is determined by a second Observable.</span>
19+
*
20+
* <img src="./img/audit.png" width="100%">
21+
*
22+
* `audit` is similar to `throttle`, but emits the last value from the silenced
23+
* time window, instead of the first value. `audit` emits the most recent value
24+
* from the source Observable on the output Observable as soon as its internal
25+
* timer becomes disabled, and ignores source values while the timer is enabled.
26+
* Initially, the timer is disabled. As soon as the first source value arrives,
27+
* the timer is enabled by calling the `durationSelector` function with the
28+
* source value, which returns the "duration" Observable. When the duration
29+
* Observable emits a value or completes, the timer is disabled, then the most
30+
* recent source value is emitted on the output Observable, and this process
31+
* repeats for the next source value.
32+
*
33+
* @example <caption>Emit clicks at a rate of at most one click per second</caption>
34+
* var clicks = Rx.Observable.fromEvent(document, 'click');
35+
* var result = clicks.audit(ev => Rx.Observable.interval(1000));
36+
* result.subscribe(x => console.log(x));
37+
*
38+
* @see {@link auditTime}
39+
* @see {@link debounce}
40+
* @see {@link delayWhen}
41+
* @see {@link sample}
42+
* @see {@link throttle}
43+
*
44+
* @param {function(value: T): SubscribableOrPromise} durationSelector A function
45+
* that receives a value from the source Observable, for computing the silencing
46+
* duration, returned as an Observable or a Promise.
47+
* @return {Observable<T>} An Observable that performs rate-limiting of
48+
* emissions from the source Observable.
49+
* @method audit
50+
* @owner Observable
51+
*/
52+
export function audit<T>(durationSelector: (value: T) => SubscribableOrPromise<any>): MonoTypeOperatorFunction<T> {
53+
return function auditOperatorFunction(source: Observable<T>) {
54+
return source.lift(new AuditOperator(durationSelector));
55+
};
56+
}
57+
58+
class AuditOperator<T> implements Operator<T, T> {
59+
constructor(private durationSelector: (value: T) => SubscribableOrPromise<any>) {
60+
}
61+
62+
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
63+
return source.subscribe(new AuditSubscriber<T, T>(subscriber, this.durationSelector));
64+
}
65+
}
66+
67+
/**
68+
* We need this JSDoc comment for affecting ESDoc.
69+
* @ignore
70+
* @extends {Ignored}
71+
*/
72+
class AuditSubscriber<T, R> extends OuterSubscriber<T, R> {
73+
74+
private value: T;
75+
private hasValue: boolean = false;
76+
private throttled: Subscription;
77+
78+
constructor(destination: Subscriber<T>,
79+
private durationSelector: (value: T) => SubscribableOrPromise<any>) {
80+
super(destination);
81+
}
82+
83+
protected _next(value: T): void {
84+
this.value = value;
85+
this.hasValue = true;
86+
if (!this.throttled) {
87+
const duration = tryCatch(this.durationSelector)(value);
88+
if (duration === errorObject) {
89+
this.destination.error(errorObject.e);
90+
} else {
91+
const innerSubscription = subscribeToResult(this, duration);
92+
if (innerSubscription.closed) {
93+
this.clearThrottle();
94+
} else {
95+
this.add(this.throttled = innerSubscription);
96+
}
97+
}
98+
}
99+
}
100+
101+
clearThrottle() {
102+
const { value, hasValue, throttled } = this;
103+
if (throttled) {
104+
this.remove(throttled);
105+
this.throttled = null;
106+
throttled.unsubscribe();
107+
}
108+
if (hasValue) {
109+
this.value = null;
110+
this.hasValue = false;
111+
this.destination.next(value);
112+
}
113+
}
114+
115+
notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void {
116+
this.clearThrottle();
117+
}
118+
119+
notifyComplete(): void {
120+
this.clearThrottle();
121+
}
122+
}

src/operators/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
export { audit } from './audit';
12
export { catchError } from './catchError';
23
export { concat } from './concat';
34
export { concatAll } from './concatAll';

0 commit comments

Comments
 (0)