Skip to content

Commit

Permalink
feat(shareBehavior): add shareBehavior and its tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre Medeiros authored and benlesh committed Oct 27, 2015
1 parent 40e9757 commit 97ff1ec
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 0 deletions.
20 changes: 20 additions & 0 deletions perf/micro/immediate-scheduler/operators/sharebehavior.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
var RxOld = require('rx');
var RxNew = require('../../../../index');

module.exports = function (suite) {
var oldShareBehaviorWithImmediateScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate)
.shareValue(0);
var newShareBehaviorWithImmediateScheduler = RxNew.Observable.range(0, 25)
.shareBehavior(0);

function _next(x) { }
function _error(e) { }
function _complete() { }
return suite
.add('old shareBehavior with immediate scheduler', function () {
oldShareBehaviorWithImmediateScheduler.subscribe(_next, _error, _complete);
})
.add('new shareBehavior with immediate scheduler', function () {
newShareBehaviorWithImmediateScheduler.subscribe(_next, _error, _complete);
});
};
151 changes: 151 additions & 0 deletions spec/operators/shareBehavior-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/* globals describe, it, expect, expectObservable, hot */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.prototype.shareBehavior', function () {
it('should share a single subscription', function () {
var subscriptionCount = 0;
var obs = new Observable(function (observer) {
subscriptionCount++;
});

var source = obs.shareBehavior(0);

expect(subscriptionCount).toBe(0);

source.subscribe();
source.subscribe();
source.subscribe();

expect(subscriptionCount).toBe(1);
});

it('should replay 1 event from the past to a late subscriber', function (done) {
var results1 = [];
var results2 = [];
var subscriptions = 0;

var source = new Observable(function (observer) {
subscriptions++;
observer.next(1);
observer.next(2);
observer.next(3);
observer.next(4);
});

var hot = source.shareBehavior(0);

expect(results1).toEqual([]);
expect(results2).toEqual([]);

hot.subscribe(function (x) {
results1.push(x);
});

expect(results1).toEqual([0, 1, 2, 3, 4]);
expect(results2).toEqual([]);

hot.subscribe(function (x) {
results2.push(x);
});

expect(results1).toEqual([0, 1, 2, 3, 4]);
expect(results2).toEqual([4]);
expect(subscriptions).toBe(1);
done();
});

it('should replay the default value if no next() ever emits', function (done) {
var results1 = [];
var results2 = [];
var subscriptions = 0;

var source = new Observable(function (observer) {
subscriptions++;
});

var hot = source.shareBehavior(0);

expect(results1).toEqual([]);
expect(results2).toEqual([]);

hot.subscribe(function (x) {
results1.push(x);
});

expect(results1).toEqual([0]);
expect(results2).toEqual([]);

hot.subscribe(function (x) {
results2.push(x);
});

expect(results1).toEqual([0]);
expect(results2).toEqual([0]);
expect(subscriptions).toBe(1);
done();
});

it('should unsubscribe from the source as soon as no more subscribers on shared', function () {
var e1 = cold( '--a---b-c--d--e--|');
var e1subs = '^ ! ';
var expected1 = 'x-a---b- ';
var unsub1 = ' ! ';
var expected2 = 'x-a---b-c--d- ';
var unsub2 = ' ! ';

var shared = e1.shareBehavior('x');
var observer1 = shared.do();
var observer2 = shared.do();

expectObservable(observer1, unsub1).toBe(expected1);
expectObservable(observer2, unsub2).toBe(expected2);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should give latest value to a late observer', function () {
var e1 = cold( '--a-b---c--d--e--|');
var e1subs = '^ ! ';
var expected1 = 'x-a-b---c--d-- ';
var unsub1 = ' ! ';
var e2 = cold( '-------x----------');
var expected2 = ' bc--d--e-- ';
var unsub2 = ' ^ ! ';

var shared = e1.shareBehavior('x');
var observer1 = shared.do();
var observer2 = e2.mergeMap(function () { return shared.do(); });

expectObservable(observer1, unsub1).toBe(expected1);
expectObservable(observer2, unsub2).toBe(expected2);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should not change the output of the observable when successful', function () {
var e1 = hot('---a--^--b-c--d--e--|');
var expected = 'x--b-c--d--e--|';

expectObservable(e1.shareBehavior('x')).toBe(expected);
});

it('should not change the output of the observable when error', function () {
var e1 = hot('---a--^--b-c--d--e--#');
var expected = 'x--b-c--d--e--#';

expectObservable(e1.shareBehavior('x')).toBe(expected);
});

it('should not change the output of the observable when never', function () {
var e1 = cold( '----');
var expected = 'a---';

expectObservable(e1.shareBehavior('a')).toBe(expected);
});

it('should not change the output of the observable when empty', function () {
var e1 = cold( '| ');
var expected = '(a|)';

expectObservable(e1.shareBehavior('a')).toBe(expected);
});
});
1 change: 1 addition & 0 deletions src/CoreOperators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export interface CoreOperators<T> {
sampleTime?: <T>(delay: number, scheduler?: Scheduler) => Observable<T>;
scan?: <R>(project: (acc: R, x: T) => R, acc?: R) => Observable<R>;
share?: () => Observable<T>;
shareBehavior?: (value: any) => Observable<T>;
shareReplay?: (bufferSize: number, windowTime: number, scheduler?: Scheduler) => Observable<T>;
single?: (predicate?: (value: T, index:number) => boolean, thisArg?: any) => Observable<T>;
skip?: (count: number) => Observable<T>;
Expand Down
3 changes: 3 additions & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ observableProto.scan = scan;
import share from './operators/share';
observableProto.share = share;

import shareBehavior from './operators/shareBehavior';
observableProto.shareBehavior = shareBehavior;

import shareReplay from './operators/shareReplay';
observableProto.shareReplay = shareReplay;

Expand Down
3 changes: 3 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ observableProto.scan = scan;
import share from './operators/share';
observableProto.share = share;

import shareBehavior from './operators/shareBehavior';
observableProto.shareBehavior = shareBehavior;

import shareReplay from './operators/shareReplay';
observableProto.shareReplay = shareReplay;

Expand Down
7 changes: 7 additions & 0 deletions src/operators/shareBehavior.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import Observable from '../Observable';
import Scheduler from '../Scheduler';
import publishBehavior from './publishBehavior';

export default function shareBehavior<T>(value: T): Observable<T> {
return publishBehavior.call(this, value).refCount();
}

0 comments on commit 97ff1ec

Please sign in to comment.