forked from ReactiveX/rxjs
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(timestamp): add timestamp operator
closes ReactiveX#1515
- Loading branch information
Showing
5 changed files
with
205 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
import * as Rx from '../../dist/cjs/Rx.KitchenSink'; | ||
declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions}; | ||
|
||
declare const rxTestScheduler: Rx.TestScheduler; | ||
const Observable = Rx.Observable; | ||
|
||
/** @test {timestamp} */ | ||
describe('Observable.prototype.timestamp', () => { | ||
asDiagram('timestamp')('should record the time stamp per each source elements', () => { | ||
const e1 = hot('--a--^b-c-----d--e--|'); | ||
const e1subs = '^ !'; | ||
const expected = '-w-x-----y--z--|'; | ||
const expectedValue = { w: 10, x: 30, y: 90, z: 120 }; | ||
|
||
const result = e1.timestamp(rxTestScheduler) | ||
.map(x => x.timestamp); | ||
|
||
expectObservable(result).toBe(expected, expectedValue); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should record stamp if source emit elements', () => { | ||
const e1 = hot('--a--^b--c----d---e--|'); | ||
const e1subs = '^ !'; | ||
const expected = '-w--x----y---z--|'; | ||
|
||
const expectedValue = { | ||
w: new Rx.TimeStamp('b', 10), | ||
x: new Rx.TimeStamp('c', 40), | ||
y: new Rx.TimeStamp('d', 90), | ||
z: new Rx.TimeStamp('e', 130) | ||
}; | ||
|
||
expectObservable(e1.timestamp(rxTestScheduler)).toBe(expected, expectedValue); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should completes without record stamp if source does not emits', () => { | ||
const e1 = hot('---------|'); | ||
const e1subs = '^ !'; | ||
const expected = '---------|'; | ||
|
||
expectObservable(e1.timestamp(rxTestScheduler)).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should complete immediately if source is empty', () => { | ||
const e1 = cold('|'); | ||
const e1subs = '(^!)'; | ||
const expected = '|'; | ||
|
||
expectObservable(e1.timestamp(rxTestScheduler)).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should record stamp then does not completes if source emits but not completes', () => { | ||
const e1 = hot('-a--b--'); | ||
const e1subs = '^ '; | ||
const expected = '-y--z--'; | ||
|
||
const expectedValue = { | ||
y: new Rx.TimeStamp('a', 10), | ||
z: new Rx.TimeStamp('b', 40) | ||
}; | ||
|
||
expectObservable(e1.timestamp(rxTestScheduler)).toBe(expected, expectedValue); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should allow unsubscribing explicitly and early', () => { | ||
const e1 = hot('-a--b-----c---d---|'); | ||
const unsub = ' ! '; | ||
const e1subs = '^ ! '; | ||
const expected = '-y--z--- '; | ||
|
||
const expectedValue = { | ||
y: new Rx.TimeStamp('a', 10), | ||
z: new Rx.TimeStamp('b', 40) | ||
}; | ||
|
||
const result = e1.timestamp(rxTestScheduler); | ||
|
||
expectObservable(result, unsub).toBe(expected, expectedValue); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should not break unsubscription chains when result is unsubscribed explicitly', () => { | ||
const e1 = hot('-a--b-----c---d---|'); | ||
const e1subs = '^ ! '; | ||
const expected = '-y--z--- '; | ||
const unsub = ' ! '; | ||
|
||
const expectedValue = { | ||
y: new Rx.TimeStamp('a', 10), | ||
z: new Rx.TimeStamp('b', 40) | ||
}; | ||
|
||
const result = e1 | ||
.mergeMap(x => Observable.of(x)) | ||
.timestamp(rxTestScheduler) | ||
.mergeMap(x => Observable.of(x)); | ||
|
||
expectObservable(result, unsub).toBe(expected, expectedValue); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should not completes if source never completes', () => { | ||
const e1 = cold('-'); | ||
const e1subs = '^'; | ||
const expected = '-'; | ||
|
||
expectObservable(e1.timestamp(rxTestScheduler)).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('raise error if source raises error', () => { | ||
const e1 = hot('---#'); | ||
const e1subs = '^ !'; | ||
const expected = '---#'; | ||
|
||
expectObservable(e1.timestamp(rxTestScheduler)).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should record stamp then raise error if source raises error after emit', () => { | ||
const e1 = hot('-a--b--#'); | ||
const e1subs = '^ !'; | ||
const expected = '-y--z--#'; | ||
|
||
const expectedValue = { | ||
y: new Rx.TimeStamp('a', 10), | ||
z: new Rx.TimeStamp('b', 40) | ||
}; | ||
|
||
expectObservable(e1.timestamp(rxTestScheduler)).toBe(expected, expectedValue); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
|
||
it('should raise error if source immediately throws', () => { | ||
const e1 = cold('#'); | ||
const e1subs = '(^!)'; | ||
const expected = '#'; | ||
|
||
expectObservable(e1.timestamp(rxTestScheduler)).toBe(expected); | ||
expectSubscriptions(e1.subscriptions).toBe(e1subs); | ||
}); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
import {Observable} from '../../Observable'; | ||
import {timestamp, TimeStampSignature} from '../../operator/timestamp'; | ||
|
||
Observable.prototype.timestamp = timestamp; | ||
|
||
declare module '../../Observable' { | ||
interface Observable<T> { | ||
timestamp: TimeStampSignature<T>; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
import {Operator} from '../Operator'; | ||
import {Observable} from '../Observable'; | ||
import {Subscriber} from '../Subscriber'; | ||
import {Scheduler} from '../Scheduler'; | ||
import {async} from '../scheduler/async'; | ||
|
||
/** | ||
* @param scheduler | ||
* @return {Observable<TimeStamp<any>>|WebSocketSubject<T>|Observable<T>} | ||
* @method timestamp | ||
* @owner Observable | ||
*/ | ||
export function timestamp<T>(scheduler: Scheduler = async): Observable<TimeStamp<T>> { | ||
return this.lift(new TimeStampOperator(scheduler)); | ||
} | ||
|
||
export interface TimeStampSignature<T> { | ||
(scheduler?: Scheduler): Observable<TimeStamp<T>>; | ||
} | ||
|
||
export class TimeStamp<T> { | ||
constructor(public value: T, public timestamp: number) { | ||
} | ||
}; | ||
|
||
class TimeStampOperator<T> implements Operator<T, TimeStamp<T>> { | ||
constructor(private scheduler: Scheduler) { | ||
} | ||
|
||
call(observer: Subscriber<TimeStamp<T>>): Subscriber<T> { | ||
return new TimeStampSubscriber(observer, this.scheduler); | ||
} | ||
} | ||
|
||
class TimeStampSubscriber<T> extends Subscriber<T> { | ||
constructor(destination: Subscriber<TimeStamp<T>>, private scheduler: Scheduler) { | ||
super(destination); | ||
} | ||
|
||
protected _next(value: T): void { | ||
const now = this.scheduler.now(); | ||
|
||
this.destination.next(new TimeStamp(value, now)); | ||
} | ||
} |