From e93bffcdf573c05dab26e47ff09d3138973724d5 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 14 Dec 2015 09:26:31 -0800 Subject: [PATCH] feat(sample): readd `sample` operator Add an implementation of `sample` that behaves like RxJS 4 BREAKING CHANGE: `sample` behavior returned to RxJS 4 behavior --- spec/operators/sample-spec.js | 180 ++++++++++++++++++++++++++++++++++ src/CoreOperators.ts | 1 + src/Observable.ts | 1 + src/Rx.KitchenSink.ts | 1 + src/Rx.ts | 1 + src/add/operator/sample.ts | 5 + src/operator/sample.ts | 56 +++++++++++ 7 files changed, 245 insertions(+) create mode 100644 spec/operators/sample-spec.js create mode 100644 src/add/operator/sample.ts create mode 100644 src/operator/sample.ts diff --git a/spec/operators/sample-spec.js b/spec/operators/sample-spec.js new file mode 100644 index 0000000000..68808f5b88 --- /dev/null +++ b/spec/operators/sample-spec.js @@ -0,0 +1,180 @@ +/* globals describe, it, expect, hot, expectObservable,, expectSubscriptions */ +var Rx = require('../../dist/cjs/Rx.KitchenSink'); +var Observable = Rx.Observable; + +describe('Observable.prototype.sample', function () { + it('should get samples when the notifier emits', function () { + var e1 = hot('----a-^--b----c----d----e----f----| '); + var e1subs = '^ ! '; + var e2 = hot( '-----x----------x----------x----------|'); + var e2subs = '^ ! '; + var expected = '-----b----------d----------f| '; + + expectObservable(e1.sample(e2)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should sample nothing if source has not nexted yet', function () { + var e1 = hot('----a-^-------b----|'); + var e1subs = '^ !'; + var e2 = hot( '-----x-------|'); + var e2subs = '^ !'; + var expected = '-------------|'; + + expectObservable(e1.sample(e2)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should not complete when the notifier completes, nor should it emit', function () { + var e1 = hot('----a----b----c----d----e----f----'); + var e1subs = '^ '; + var e2 = hot('------x-| '); + var e2subs = '^ ! '; + var expected = '------a---------------------------'; + + expectObservable(e1.sample(e2)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should complete only when the source completes, if notifier completes early', function () { + var e1 = hot('----a----b----c----d----e----f---|'); + var e1subs = '^ !'; + var e2 = hot('------x-| '); + var e2subs = '^ ! '; + var expected = '------a--------------------------|'; + + expectObservable(e1.sample(e2)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should allow unsubscribing explicitly and early', function () { + var e1 = hot('----a-^--b----c----d----e----f----| '); + var unsub = ' ! '; + var e1subs = '^ ! '; + var e2 = hot( '-----x----------x----------x----------|'); + var e2subs = '^ ! '; + var expected = '-----b--------- '; + + expectObservable(e1.sample(e2), unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should not break unsubscription chains when result is unsubscribed explicitly', function () { + var e1 = hot('----a-^--b----c----d----e----f----| '); + var e1subs = '^ ! '; + var e2 = hot( '-----x----------x----------x----------|'); + var e2subs = '^ ! '; + var expected = '-----b--------- '; + var unsub = ' ! '; + + var result = e1 + .mergeMap(function (x) { return Observable.of(x); }) + .sample(e2) + .mergeMap(function (x) { return Observable.of(x); }); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should only sample when a new value arrives, even if it is the same value', function () { + var e1 = hot('----a----b----c----c----e----f----| '); + var e1subs = '^ ! '; + var e2 = hot('------x-x------xx-x---x----x--------|'); + var e2subs = '^ ! '; + var expected = '------a--------c------c----e------| '; + + expectObservable(e1.sample(e2)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should raise error if source raises error', function () { + var e1 = hot('----a-^--b----c----d----# '); + var e1subs = '^ ! '; + var e2 = hot( '-----x----------x----------x----------|'); + var e2subs = '^ ! '; + var expected = '-----b----------d-# '; + + expectObservable(e1.sample(e2)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should completes if source does not emits', function () { + var e1 = hot('|'); + var e2 = hot('------x-------|'); + var expected = '|'; + var e1subs = '(^!)'; + var e2subs = '(^!)'; + + expectObservable(e1.sample(e2)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should raise error if source throws immediately', function () { + var e1 = hot('#'); + var e2 = hot('------x-------|'); + var expected = '#'; + var e1subs = '(^!)'; + var e2subs = '(^!)'; + + expectObservable(e1.sample(e2)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should raise error if notification raises error', function () { + var e1 = hot('--a-----|'); + var e2 = hot('----#'); + var expected = '----#'; + var e1subs = '^ !'; + var e2subs = '^ !'; + + expectObservable(e1.sample(e2)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should not completes if source does not complete', function () { + var e1 = hot('-'); + var e1subs = '^ '; + var e2 = hot('------x-------|'); + var e2subs = '^ !'; + var expected = '-'; + + expectObservable(e1.sample(e2)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should sample only until source completes', function () { + var e1 = hot('----a----b----c----d-|'); + var e1subs = '^ !'; + var e2 = hot('-----------x----------x------------|'); + var e2subs = '^ !'; + var expected = '-----------b---------|'; + + expectObservable(e1.sample(e2)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should complete sampling if sample observable completes', function () { + var e1 = hot('----a----b----c----d-|'); + var e1subs = '^ !'; + var e2 = hot('|'); + var e2subs = '(^!)'; + var expected = '---------------------|'; + + expectObservable(e1.sample(e2)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); +}); \ No newline at end of file diff --git a/src/CoreOperators.ts b/src/CoreOperators.ts index b08c14b552..cf86ebac1e 100644 --- a/src/CoreOperators.ts +++ b/src/CoreOperators.ts @@ -62,6 +62,7 @@ export interface CoreOperators { repeat?: (count?: number) => Observable; retry?: (count?: number) => Observable; retryWhen?: (notifier: (errors: Observable) => Observable) => Observable; + sample?: (notifier: Observable) => Observable; scan?: (project: (acc: R, x: T) => R, acc?: R) => Observable; share?: () => Observable; single?: (predicate?: (value: T, index: number) => boolean) => Observable; diff --git a/src/Observable.ts b/src/Observable.ts index a4329bd386..3c39b3358a 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -246,6 +246,7 @@ export class Observable implements CoreOperators { repeat: (count?: number) => Observable; retry: (count?: number) => Observable; retryWhen: (notifier: (errors: Observable) => Observable) => Observable; + sample: (notifier: Observable) => Observable; scan: (accumulator: (acc: R, x: T) => R, seed?: T | R) => Observable; share: () => Observable; single: (predicate?: (value: T, index: number) => boolean) => Observable; diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index 8e8940f9fa..0106db03b1 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -102,6 +102,7 @@ import './add/operator/reduce'; import './add/operator/repeat'; import './add/operator/retry'; import './add/operator/retryWhen'; +import './add/operator/sample'; import './add/operator/scan'; import './add/operator/share'; import './add/operator/single'; diff --git a/src/Rx.ts b/src/Rx.ts index 33bebf53a5..3215d53dbb 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -74,6 +74,7 @@ import './add/operator/reduce'; import './add/operator/repeat'; import './add/operator/retry'; import './add/operator/retryWhen'; +import './add/operator/sample'; import './add/operator/scan'; import './add/operator/share'; import './add/operator/single'; diff --git a/src/add/operator/sample.ts b/src/add/operator/sample.ts new file mode 100644 index 0000000000..9b92e046f1 --- /dev/null +++ b/src/add/operator/sample.ts @@ -0,0 +1,5 @@ +import {Observable} from '../../Observable'; +import {sample} from '../../operator/sample'; +Observable.prototype.sample = sample; + +export var _void: void; \ No newline at end of file diff --git a/src/operator/sample.ts b/src/operator/sample.ts new file mode 100644 index 0000000000..96c13e461d --- /dev/null +++ b/src/operator/sample.ts @@ -0,0 +1,56 @@ +import {Observable} from '../Observable'; +import {Operator} from '../Operator'; +import {Subscriber} from '../Subscriber'; + +export function sample(notifier: Observable): Observable { + return this.lift(new SampleOperator(notifier)); +} + +class SampleOperator implements Operator { + constructor(private notifier: Observable) { + } + + call(subscriber: Subscriber) { + return new SampleSubscriber(subscriber, this.notifier); + } +} + +class SampleSubscriber extends Subscriber { + private lastValue: T; + private hasValue: boolean = false; + + constructor(destination: Subscriber, private notifier: Observable) { + super(destination); + this.add(notifier._subscribe(new SampleNotificationSubscriber(this))); + } + + _next(value: T) { + this.lastValue = value; + this.hasValue = true; + } + + notifyNext() { + if (this.hasValue) { + this.hasValue = false; + this.destination.next(this.lastValue); + } + } +} + +class SampleNotificationSubscriber extends Subscriber { + constructor(private parent: SampleSubscriber) { + super(null); + } + + _next() { + this.parent.notifyNext(); + } + + _error(err: any) { + this.parent.error(err); + } + + _complete() { + //noop + } +}