From 27f100be56ac72d50ee273f6345a45142fa3be07 Mon Sep 17 00:00:00 2001 From: OJ Kwon Date: Wed, 23 Mar 2016 00:01:26 -0700 Subject: [PATCH] feat(timestamp): add timestamp operator closes #1515 --- doc/operators.md | 1 + spec/operators/timestamp-spec.ts | 147 +++++++++++++++++++++++++++++++ src/Rx.KitchenSink.ts | 2 + src/add/operator/timestamp.ts | 10 +++ src/operator/timestamp.ts | 45 ++++++++++ 5 files changed, 205 insertions(+) create mode 100644 spec/operators/timestamp-spec.ts create mode 100644 src/add/operator/timestamp.ts create mode 100644 src/operator/timestamp.ts diff --git a/doc/operators.md b/doc/operators.md index b21a4067add..3fea44d57ee 100644 --- a/doc/operators.md +++ b/doc/operators.md @@ -227,6 +227,7 @@ There are operators for different purposes, and they may be categorized as: crea - [`observeOn`](../class/es6/Observable.js~Observable.html#instance-method-observeOn) - [`subscribeOn`](../class/es6/Observable.js~Observable.html#instance-method-subscribeOn) - [`timeInterval`](../class/es6/Observable.js~Observable.html#instance-method-timeInterval) +- [`timestamp`](../class/es6/Observable.js~Observable.html#instance-method-timestamp) - [`timeout`](../class/es6/Observable.js~Observable.html#instance-method-timeout) - [`timeoutWith`](../class/es6/Observable.js~Observable.html#instance-method-timeoutWith) - [`toArray`](../class/es6/Observable.js~Observable.html#instance-method-toArray) diff --git a/spec/operators/timestamp-spec.ts b/spec/operators/timestamp-spec.ts new file mode 100644 index 00000000000..b533f3c083b --- /dev/null +++ b/spec/operators/timestamp-spec.ts @@ -0,0 +1,147 @@ +import * as Rx from '../../dist/cjs/Rx.KitchenSink'; +declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions}; + +declare const rxTestScheduler: Rx.TestScheduler; +const Observable = Rx.Observable; + +/** @test {timestamp} */ +describe('Observable.prototype.timestamp', () => { + asDiagram('timestamp')('should record the time stamp per each source elements', () => { + const e1 = hot('--a--^b-c-----d--e--|'); + const e1subs = '^ !'; + const expected = '-w-x-----y--z--|'; + const expectedValue = { w: 10, x: 30, y: 90, z: 120 }; + + const result = e1.timestamp(rxTestScheduler) + .map(x => x.timestamp); + + expectObservable(result).toBe(expected, expectedValue); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should record stamp if source emit elements', () => { + const e1 = hot('--a--^b--c----d---e--|'); + const e1subs = '^ !'; + const expected = '-w--x----y---z--|'; + + const expectedValue = { + w: new Rx.TimeStamp('b', 10), + x: new Rx.TimeStamp('c', 40), + y: new Rx.TimeStamp('d', 90), + z: new Rx.TimeStamp('e', 130) + }; + + expectObservable(e1.timestamp(rxTestScheduler)).toBe(expected, expectedValue); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should completes without record stamp if source does not emits', () => { + const e1 = hot('---------|'); + const e1subs = '^ !'; + const expected = '---------|'; + + expectObservable(e1.timestamp(rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should complete immediately if source is empty', () => { + const e1 = cold('|'); + const e1subs = '(^!)'; + const expected = '|'; + + expectObservable(e1.timestamp(rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should record stamp then does not completes if source emits but not completes', () => { + const e1 = hot('-a--b--'); + const e1subs = '^ '; + const expected = '-y--z--'; + + const expectedValue = { + y: new Rx.TimeStamp('a', 10), + z: new Rx.TimeStamp('b', 40) + }; + + expectObservable(e1.timestamp(rxTestScheduler)).toBe(expected, expectedValue); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should allow unsubscribing explicitly and early', () => { + const e1 = hot('-a--b-----c---d---|'); + const unsub = ' ! '; + const e1subs = '^ ! '; + const expected = '-y--z--- '; + + const expectedValue = { + y: new Rx.TimeStamp('a', 10), + z: new Rx.TimeStamp('b', 40) + }; + + const result = e1.timestamp(rxTestScheduler); + + expectObservable(result, unsub).toBe(expected, expectedValue); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should not break unsubscription chains when result is unsubscribed explicitly', () => { + const e1 = hot('-a--b-----c---d---|'); + const e1subs = '^ ! '; + const expected = '-y--z--- '; + const unsub = ' ! '; + + const expectedValue = { + y: new Rx.TimeStamp('a', 10), + z: new Rx.TimeStamp('b', 40) + }; + + const result = e1 + .mergeMap(x => Observable.of(x)) + .timestamp(rxTestScheduler) + .mergeMap(x => Observable.of(x)); + + expectObservable(result, unsub).toBe(expected, expectedValue); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should not completes if source never completes', () => { + const e1 = cold('-'); + const e1subs = '^'; + const expected = '-'; + + expectObservable(e1.timestamp(rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('raise error if source raises error', () => { + const e1 = hot('---#'); + const e1subs = '^ !'; + const expected = '---#'; + + expectObservable(e1.timestamp(rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should record stamp then raise error if source raises error after emit', () => { + const e1 = hot('-a--b--#'); + const e1subs = '^ !'; + const expected = '-y--z--#'; + + const expectedValue = { + y: new Rx.TimeStamp('a', 10), + z: new Rx.TimeStamp('b', 40) + }; + + expectObservable(e1.timestamp(rxTestScheduler)).toBe(expected, expectedValue); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should raise error if source immediately throws', () => { + const e1 = cold('#'); + const e1subs = '(^!)'; + const expected = '#'; + + expectObservable(e1.timestamp(rxTestScheduler)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); +}); \ No newline at end of file diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index 6b1369c3a1e..11d5c84b352 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -20,7 +20,9 @@ import './add/operator/mergeScan'; import './add/operator/min'; import './add/operator/pairwise'; import './add/operator/timeInterval'; +import './add/operator/timestamp'; export {TimeInterval} from './operator/timeInterval'; +export {TimeStamp} from './operator/timestamp'; export {TestScheduler} from './testing/TestScheduler'; export {VirtualTimeScheduler} from './scheduler/VirtualTimeScheduler'; \ No newline at end of file diff --git a/src/add/operator/timestamp.ts b/src/add/operator/timestamp.ts new file mode 100644 index 00000000000..36436c84dc0 --- /dev/null +++ b/src/add/operator/timestamp.ts @@ -0,0 +1,10 @@ +import {Observable} from '../../Observable'; +import {timestamp, TimeStampSignature} from '../../operator/timestamp'; + +Observable.prototype.timestamp = timestamp; + +declare module '../../Observable' { + interface Observable { + timestamp: TimeStampSignature; + } +} \ No newline at end of file diff --git a/src/operator/timestamp.ts b/src/operator/timestamp.ts new file mode 100644 index 00000000000..41527e45f0c --- /dev/null +++ b/src/operator/timestamp.ts @@ -0,0 +1,45 @@ +import {Operator} from '../Operator'; +import {Observable} from '../Observable'; +import {Subscriber} from '../Subscriber'; +import {Scheduler} from '../Scheduler'; +import {async} from '../scheduler/async'; + +/** + * @param scheduler + * @return {Observable>|WebSocketSubject|Observable} + * @method timestamp + * @owner Observable + */ +export function timestamp(scheduler: Scheduler = async): Observable> { + return this.lift(new TimeStampOperator(scheduler)); +} + +export interface TimeStampSignature { + (scheduler?: Scheduler): Observable>; +} + +export class TimeStamp { + constructor(public value: T, public timestamp: number) { + } +}; + +class TimeStampOperator implements Operator> { + constructor(private scheduler: Scheduler) { + } + + call(observer: Subscriber>): Subscriber { + return new TimeStampSubscriber(observer, this.scheduler); + } +} + +class TimeStampSubscriber extends Subscriber { + constructor(destination: Subscriber>, private scheduler: Scheduler) { + super(destination); + } + + protected _next(value: T): void { + const now = this.scheduler.now(); + + this.destination.next(new TimeStamp(value, now)); + } +} \ No newline at end of file