Skip to content

Commit 0e5991d

Browse files
kwonojbenlesh
authored andcommitted
feat(publish): support optional selectors
closes #1629
1 parent 32fa3a4 commit 0e5991d

File tree

3 files changed

+28
-3
lines changed

3 files changed

+28
-3
lines changed

spec/operators/publish-spec.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,23 @@ describe('Observable.prototype.publish', () => {
7777
published.connect();
7878
});
7979

80+
it('should accept selectors', () => {
81+
const source = hot('-1-2-3----4-|');
82+
const sourceSubs = ['^ !'];
83+
const published = source.publish(x => x.zip(x, (a, b) => (parseInt(a) + parseInt(b)).toString()));
84+
const subscriber1 = hot('a| ').mergeMapTo(published);
85+
const expected1 = '-2-4-6----8-|';
86+
const subscriber2 = hot(' b| ').mergeMapTo(published);
87+
const expected2 = ' -6----8-|';
88+
const subscriber3 = hot(' c| ').mergeMapTo(published);
89+
const expected3 = ' --8-|';
90+
91+
expectObservable(subscriber1).toBe(expected1);
92+
expectObservable(subscriber2).toBe(expected2);
93+
expectObservable(subscriber3).toBe(expected3);
94+
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
95+
});
96+
8097
it('should multicast an error from the source to multiple observers', () => {
8198
const source = cold('-1-2-3----4-#');
8299
const sourceSubs = '^ !';

src/add/operator/publish.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import {Observable} from '../../Observable';
33
import {publish, PublishSignature} from '../../operator/publish';
44

5-
Observable.prototype.publish = publish;
5+
Observable.prototype.publish = <any>publish;
66

77
declare module '../../Observable' {
88
interface Observable<T> {

src/operator/publish.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import {Subject} from '../Subject';
2+
import {Observable} from '../Observable';
23
import {multicast} from './multicast';
34
import {ConnectableObservable} from '../observable/ConnectableObservable';
45

@@ -8,14 +9,21 @@ import {ConnectableObservable} from '../observable/ConnectableObservable';
89
*
910
* <img src="./img/publish.png" width="100%">
1011
*
12+
* @param {Function} Optional selector function which can use the multicasted source sequence as many times as needed,
13+
* without causing multiple subscriptions to the source sequence.
14+
* Subscribers to the given source will receive all notifications of the source from the time of the subscription on.
1115
* @return a ConnectableObservable that upon connection causes the source Observable to emit items to its Observers.
1216
* @method publish
1317
* @owner Observable
1418
*/
15-
export function publish<T>(): ConnectableObservable<T> {
16-
return multicast.call(this, new Subject<T>());
19+
export function publish<T>(selector?: (source: Observable<T>) => Observable<T>): Observable<T> | ConnectableObservable<T> {
20+
return selector ? multicast.call(this, () => new Subject<T>(), selector) :
21+
multicast.call(this, new Subject<T>());
1722
}
1823

24+
export type selector<T> = (source: Observable<T>) => Observable<T>;
25+
1926
export interface PublishSignature<T> {
2027
(): ConnectableObservable<T>;
28+
(selector: selector<T>): Observable<T>;
2129
}

0 commit comments

Comments
 (0)