Skip to content

Commit

Permalink
feat(publishLast): add publishLast operator
Browse files Browse the repository at this point in the history
closes #883
  • Loading branch information
kwonoj committed Dec 4, 2015
1 parent 1ab3508 commit 9bef228
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 0 deletions.
222 changes: 222 additions & 0 deletions spec/operators/publishLast-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/* globals describe, it, expect, expectObservable, expectSubscriptions, hot, cold */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;
var Subject = Rx.Subject;

describe('Observable.prototype.publishLast()', function () {
it('should return a ConnectableObservable', function () {
var source = Observable.of(1).publishLast();

expect(source instanceof Rx.ConnectableObservable).toBe(true);
});

it('should emit last notification of a simple source Observable', function () {
var source = cold('--1-2---3-4--5-|');
var sourceSubs = '^ !';
var published = source.publishLast();
var expected = '---------------(5|)';

expectObservable(published).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);

published.connect();
});

it('should do nothing if connect is not called, despite subscriptions', function () {
var source = cold('--1-2---3-4--5-|');
var sourceSubs = [];
var published = source.publishLast();
var expected = '-';

expectObservable(published).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should multicast the same values to multiple observers', function () {
var source = cold('-1-2-3----4-|');
var sourceSubs = '^ !';
var published = source.publishLast();
var subscriber1 = hot('a| ').mergeMapTo(published);
var expected1 = '------------(4|)';
var subscriber2 = hot(' b| ').mergeMapTo(published);
var expected2 = ' --------(4|)';
var subscriber3 = hot(' c| ').mergeMapTo(published);
var expected3 = ' ----(4|)';

expectObservable(subscriber1).toBe(expected1);
expectObservable(subscriber2).toBe(expected2);
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);

published.connect();
});

it('should multicast an error from the source to multiple observers', function () {
var source = cold('-1-2-3----4-#');
var sourceSubs = '^ !';
var published = source.publishLast();
var subscriber1 = hot('a| ').mergeMapTo(published);
var expected1 = '------------#';
var subscriber2 = hot(' b| ').mergeMapTo(published);
var expected2 = ' --------#';
var subscriber3 = hot(' c| ').mergeMapTo(published);
var expected3 = ' ----#';

expectObservable(subscriber1).toBe(expected1);
expectObservable(subscriber2).toBe(expected2);
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);

published.connect();
});

it('should not cast any values to multiple observers, ' +
'when source is unsubscribed explicitly and early', function () {
var source = cold('-1-2-3----4-|');
var sourceSubs = '^ ! ';
var published = source.publishLast();
var unsub = ' u ';
var subscriber1 = hot('a| ').mergeMapTo(published);
var expected1 = '---------- ';
var subscriber2 = hot(' b| ').mergeMapTo(published);
var expected2 = ' ------ ';
var subscriber3 = hot(' c| ').mergeMapTo(published);
var expected3 = ' -- ';

expectObservable(subscriber1).toBe(expected1);
expectObservable(subscriber2).toBe(expected2);
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);

// Set up unsubscription action
var connection;
expectObservable(hot(unsub).do(function () {
connection.unsubscribe();
})).toBe(unsub);

connection = published.connect();
});

describe('with refCount()', function () {
it('should connect when first subscriber subscribes', function () {
var source = cold( '-1-2-3----4-|');
var sourceSubs = ' ^ !';
var replayed = source.publishLast().refCount();
var subscriber1 = hot(' a| ').mergeMapTo(replayed);
var expected1 = ' ------------(4|)';
var subscriber2 = hot(' b| ').mergeMapTo(replayed);
var expected2 = ' --------(4|)';
var subscriber3 = hot(' c| ').mergeMapTo(replayed);
var expected3 = ' ----(4|)';

expectObservable(subscriber1).toBe(expected1);
expectObservable(subscriber2).toBe(expected2);
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should disconnect when last subscriber unsubscribes', function () {
var source = cold( '-1-2-3----4-|');
var sourceSubs = ' ^ ! ';
var replayed = source.publishLast().refCount();
var subscriber1 = hot(' a| ').mergeMapTo(replayed);
var unsub1 = ' ! ';
var expected1 = ' -------- ';
var subscriber2 = hot(' b| ').mergeMapTo(replayed);
var unsub2 = ' ! ';
var expected2 = ' ------ ';

expectObservable(subscriber1, unsub1).toBe(expected1);
expectObservable(subscriber2, unsub2).toBe(expected2);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should NOT be retryable', function () {
var source = cold('-1-2-3----4-#');
var sourceSubs = '^ !';
var published = source.publishLast().refCount().retry(3);
var subscriber1 = hot('a| ').mergeMapTo(published);
var expected1 = '------------#';
var subscriber2 = hot(' b| ').mergeMapTo(published);
var expected2 = ' --------#';
var subscriber3 = hot(' c| ').mergeMapTo(published);
var expected3 = ' ----#';

expectObservable(subscriber1).toBe(expected1);
expectObservable(subscriber2).toBe(expected2);
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});
});

it('should multicast an empty source', function () {
var source = cold('|');
var sourceSubs = '(^!)';
var published = source.publishLast();
var expected = '|';

expectObservable(published).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);

published.connect();
});

it('should multicast a never source', function () {
var source = cold('-');
var sourceSubs = '^';
var published = source.publishLast();
var expected = '-';

expectObservable(published).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);

published.connect();
});

it('should multicast a throw source', function () {
var source = cold('#');
var sourceSubs = '(^!)';
var published = source.publishLast();
var expected = '#';

expectObservable(published).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);

published.connect();
});

it('should multicast one observable to multiple observers', function (done) {
var results1 = [];
var results2 = [];
var subscriptions = 0;

var source = new Observable(function (observer) {
subscriptions++;
observer.next(1);
observer.next(2);
observer.next(3);
observer.next(4);
observer.complete();
});

var connectable = source.publishLast();

connectable.subscribe(function (x) {
results1.push(x);
});

connectable.subscribe(function (x) {
results2.push(x);
});

expect(results1).toEqual([]);
expect(results2).toEqual([]);

connectable.connect();

expect(results1).toEqual([4]);
expect(results2).toEqual([4]);
expect(subscriptions).toBe(1);
done();
});
});
1 change: 1 addition & 0 deletions src/CoreOperators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export interface CoreOperators<T> {
publish?: () => ConnectableObservable<T>;
publishBehavior?: (value: any) => ConnectableObservable<T>;
publishReplay?: (bufferSize?: number, windowTime?: number, scheduler?: Scheduler) => ConnectableObservable<T>;
publishLast?: () => ConnectableObservable<T>;
reduce?: <R>(project: (acc: R, x: T) => R, seed?: R) => Observable<R>;
repeat?: (count?: number) => Observable<T>;
retry?: (count?: number) => Observable<T>;
Expand Down
1 change: 1 addition & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ export class Observable<T> implements CoreOperators<T> {
publish: () => ConnectableObservable<T>;
publishBehavior: (value: any) => ConnectableObservable<T>;
publishReplay: (bufferSize?: number, windowTime?: number, scheduler?: Scheduler) => ConnectableObservable<T>;
publishLast: () => ConnectableObservable<T>;
reduce: <R>(project: (acc: R, x: T) => R, seed?: R) => Observable<R>;
repeat: (count?: number) => Observable<T>;
retry: (count?: number) => Observable<T>;
Expand Down
1 change: 1 addition & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ import './add/operator/partition';
import './add/operator/publish';
import './add/operator/publishBehavior';
import './add/operator/publishReplay';
import './add/operator/publishLast';
import './add/operator/reduce';
import './add/operator/repeat';
import './add/operator/retry';
Expand Down
1 change: 1 addition & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import './add/operator/partition';
import './add/operator/publish';
import './add/operator/publishBehavior';
import './add/operator/publishReplay';
import './add/operator/publishLast';
import './add/operator/reduce';
import './add/operator/repeat';
import './add/operator/retry';
Expand Down
4 changes: 4 additions & 0 deletions src/add/operator/publishLast.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import {Observable} from '../../Observable';
import {publishLast} from '../../operator/publishLast';

Observable.prototype.publishLast = publishLast;
7 changes: 7 additions & 0 deletions src/operator/publishLast.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import {AsyncSubject} from '../subject/AsyncSubject';
import {multicast} from './multicast';
import {ConnectableObservable} from '../observable/ConnectableObservable';

export function publishLast<T>(): ConnectableObservable<T> {
return multicast.call(this, new AsyncSubject());
}

0 comments on commit 9bef228

Please sign in to comment.