Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support optional selector for publish / multicast #1659

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions perf/micro/immediate-scheduler/operators/multicast-selector.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
var RxOld = require('rx');
var RxNew = require('../../../../index');

module.exports = function (suite) {
var oldMulticastWithImmediateScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate)
.multicast(function () {
return new RxOld.Subject();
}, function (x) {
return x;
});
var newMulticastWithImmediateScheduler = RxNew.Observable.range(0, 25)
.multicast(function () {
return new RxNew.Subject();
}, function (x) {
return x;
});

function _next(x) { }
function _error(e) { }
function _complete() { }
return suite
.add('old multicast with selector and immediate scheduler', function () {
oldMulticastWithImmediateScheduler.subscribe(_next, _error, _complete);
})
.add('new multicast with selector and immediate scheduler', function () {
newMulticastWithImmediateScheduler.subscribe(_next, _error, _complete);
});
};
27 changes: 27 additions & 0 deletions perf/micro/immediate-scheduler/operators/multicast.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
var RxOld = require('rx');
var RxNew = require('../../../../index');

module.exports = function (suite) {
var oldSubject = new RxOld.Subject();
var newSubject = new RxNew.Subject();

var oldMulticastWithImmediateScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate)
.multicast(oldSubject);
var newMulticastWithImmediateScheduler = RxNew.Observable.range(0, 25)
.multicast(newSubject);

function _next(x) { }
function _error(e) { }
function _complete() { }

oldMulticastWithImmediateScheduler.connect();
newMulticastWithImmediateScheduler.connect();

return suite
.add('old multicast with immediate scheduler', function () {
oldMulticastWithImmediateScheduler.subscribe(_next, _error, _complete);
})
.add('new multicast with immediate scheduler', function () {
newMulticastWithImmediateScheduler.subscribe(_next, _error, _complete);
});
};
24 changes: 24 additions & 0 deletions perf/micro/immediate-scheduler/operators/publish-selector.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
var RxOld = require('rx');
var RxNew = require('../../../../index');

module.exports = function (suite) {
var oldPublishWithImmediateScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate)
.publish(function (x) {
return x;
});
var newPublishWithImmediateScheduler = RxNew.Observable.range(0, 25)
.publish(function (x) {
return x;
});

function _next(x) { }
function _error(e) { }
function _complete() { }
return suite
.add('old publish with selector and immediate scheduler', function () {
oldPublishWithImmediateScheduler.subscribe(_next, _error, _complete);
})
.add('new publish with selector and immediate scheduler', function () {
newPublishWithImmediateScheduler.subscribe(_next, _error, _complete);
});
};
22 changes: 22 additions & 0 deletions perf/micro/immediate-scheduler/operators/publish.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
var RxOld = require('rx');
var RxNew = require('../../../../index');

module.exports = function (suite) {
var oldPublishWithImmediateScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate).publish();
var newPublishWithImmediateScheduler = RxNew.Observable.range(0, 25).publish();

function _next(x) { }
function _error(e) { }
function _complete() { }

oldPublishWithImmediateScheduler.connect();
newPublishWithImmediateScheduler.connect();

return suite
.add('old publish with immediate scheduler', function () {
oldPublishWithImmediateScheduler.subscribe(_next, _error, _complete);
})
.add('new publish with immediate scheduler', function () {
newPublishWithImmediateScheduler.subscribe(_next, _error, _complete);
});
};
18 changes: 18 additions & 0 deletions spec/operators/multicast-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,24 @@ describe('Observable.prototype.multicast', () => {
connectable.connect();
});

it('should accept selectors to factory functions', () => {
const source = hot('-1-2-3----4-|');
const sourceSubs = ['^ !'];
const multicasted = source.multicast(() => new Subject(),
x => x.zip(x, (a, b) => (parseInt(a) + parseInt(b)).toString()));
const subscriber1 = hot('a| ').mergeMapTo(multicasted);
const expected1 = '-2-4-6----8-|';
const subscriber2 = hot(' b| ').mergeMapTo(multicasted);
const expected2 = ' -6----8-|';
const subscriber3 = hot(' c| ').mergeMapTo(multicasted);
const expected3 = ' --8-|';

expectObservable(subscriber1).toBe(expected1);
expectObservable(subscriber2).toBe(expected2);
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should do nothing if connect is not called, despite subscriptions', () => {
const source = cold('--1-2---3-4--5-|');
const sourceSubs = [];
Expand Down
17 changes: 17 additions & 0 deletions spec/operators/publish-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,23 @@ describe('Observable.prototype.publish', () => {
published.connect();
});

it('should accept selectors', () => {
const source = hot('-1-2-3----4-|');
const sourceSubs = ['^ !'];
const published = source.publish(x => x.zip(x, (a, b) => (parseInt(a) + parseInt(b)).toString()));
const subscriber1 = hot('a| ').mergeMapTo(published);
const expected1 = '-2-4-6----8-|';
const subscriber2 = hot(' b| ').mergeMapTo(published);
const expected2 = ' -6----8-|';
const subscriber3 = hot(' c| ').mergeMapTo(published);
const expected3 = ' --8-|';

expectObservable(subscriber1).toBe(expected1);
expectObservable(subscriber2).toBe(expected2);
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should multicast an error from the source to multiple observers', () => {
const source = cold('-1-2-3----4-#');
const sourceSubs = '^ !';
Expand Down
1 change: 1 addition & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ export {Subscriber} from './Subscriber';
export {AsyncSubject} from './AsyncSubject';
export {ReplaySubject} from './ReplaySubject';
export {BehaviorSubject} from './BehaviorSubject';
export {MulticastObservable} from './observable/MulticastObservable';
export {ConnectableObservable} from './observable/ConnectableObservable';
export {Notification} from './Notification';
export {EmptyError} from './util/EmptyError';
Expand Down
2 changes: 1 addition & 1 deletion src/add/operator/multicast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import {Observable} from '../../Observable';
import {multicast, MulticastSignature} from '../../operator/multicast';

Observable.prototype.multicast = multicast;
Observable.prototype.multicast = <any>multicast;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a smell. Why do we need to do this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To support different signature multicast(subject) / multicast(factory, selector?).

As similar to other operators have different signatures mergeMapTo / add/mergeMapTo if there's completely different two signatures, can't be assigned into prototype directly since function definition is single, polymorphic.


declare module '../../Observable' {
interface Observable<T> {
Expand Down
2 changes: 1 addition & 1 deletion src/add/operator/publish.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import {Observable} from '../../Observable';
import {publish, PublishSignature} from '../../operator/publish';

Observable.prototype.publish = publish;
Observable.prototype.publish = <any>publish;

declare module '../../Observable' {
interface Observable<T> {
Expand Down
20 changes: 20 additions & 0 deletions src/observable/MulticastObservable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import {Observable} from '../Observable';
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';
import {ConnectableObservable} from '../observable/ConnectableObservable';

export class MulticastObservable<T> extends Observable<T> {
constructor(protected source: Observable<T>,
private connectable: ConnectableObservable<T>,
private selector: (source: Observable<T>) => Observable<T>) {
super();
}

protected _subscribe(subscriber: Subscriber<T>): Subscription {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kwonoj shouldn't this really be done with an operator? Why introduce another Observable type?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't have strong reasoning behind this, actually. So suggest to remove this observable type?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it'll make the ConnectableObservable changes I'm working on easier to integrate.

const {selector, connectable} = this;

const subscription = selector(connectable).subscribe(subscriber);
subscription.add(connectable.connect());
return subscription;
}
}
16 changes: 13 additions & 3 deletions src/operator/multicast.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import {Subject} from '../Subject';
import {Observable} from '../Observable';
import {MulticastObservable} from '../observable/MulticastObservable';
import {ConnectableObservable} from '../observable/ConnectableObservable';

/**
Expand All @@ -7,7 +9,10 @@ import {ConnectableObservable} from '../observable/ConnectableObservable';
*
* <img src="./img/multicast.png" width="100%">
*
* @param {Function} selector - a function that can use the multicasted source stream
* @param {Function|Subject} Factory function to create an intermediate subject through
* which the source sequence's elements will be multicast to the selector function
* or Subject to push source elements into.
* @param {Function} Optional selector function that can use the multicasted source stream
* as many times as needed, without causing multiple subscriptions to the source stream.
* Subscribers to the given source will receive all notifications of the source from the
* time of the subscription forward.
Expand All @@ -17,7 +22,8 @@ import {ConnectableObservable} from '../observable/ConnectableObservable';
* @method multicast
* @owner Observable
*/
export function multicast<T>(subjectOrSubjectFactory: Subject<T> | (() => Subject<T>)): ConnectableObservable<T> {
export function multicast<T>(subjectOrSubjectFactory: Subject<T> | (() => Subject<T>),
selector?: (source: Observable<T>) => Observable<T>): Observable<T> | ConnectableObservable<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not in love with the name selector for this. But I'm not really sure what else to call it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

me too. followed RxJS 4 convention, since wasn't able to come up with better name.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sandbox?
container?
shareAndJoin?
shareJoin?
divergeAndConverge?
converge?
work?
action?

let subjectFactory: () => Subject<T>;
if (typeof subjectOrSubjectFactory === 'function') {
subjectFactory = <() => Subject<T>>subjectOrSubjectFactory;
Expand All @@ -26,11 +32,15 @@ export function multicast<T>(subjectOrSubjectFactory: Subject<T> | (() => Subjec
return <Subject<T>>subjectOrSubjectFactory;
};
}
return new ConnectableObservable(this, subjectFactory);

const connectable = new ConnectableObservable(this, subjectFactory);
return selector ? new MulticastObservable(this, connectable, selector) : connectable;
}

export type factoryOrValue<T> = T | (() => T);
export type selector<T> = (source: Observable<T>) => Observable<T>;

export interface MulticastSignature<T> {
(subjectOrSubjectFactory: factoryOrValue<Subject<T>>): ConnectableObservable<T>;
(SubjectFactory: () => Subject<T>, selector?: selector<T>): Observable<T>;
}
12 changes: 10 additions & 2 deletions src/operator/publish.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {Subject} from '../Subject';
import {Observable} from '../Observable';
import {multicast} from './multicast';
import {ConnectableObservable} from '../observable/ConnectableObservable';

Expand All @@ -8,14 +9,21 @@ import {ConnectableObservable} from '../observable/ConnectableObservable';
*
* <img src="./img/publish.png" width="100%">
*
* @param {Function} Optional selector function which can use the multicasted source sequence as many times as needed,
* without causing multiple subscriptions to the source sequence.
* Subscribers to the given source will receive all notifications of the source from the time of the subscription on.
* @return a ConnectableObservable that upon connection causes the source Observable to emit items to its Observers.
* @method publish
* @owner Observable
*/
export function publish<T>(): ConnectableObservable<T> {
return multicast.call(this, new Subject<T>());
export function publish<T>(selector?: (source: Observable<T>) => Observable<T>): Observable<T> | ConnectableObservable<T> {
return selector ? multicast.call(this, () => new Subject<T>(), selector) :
multicast.call(this, new Subject<T>());
}

export type selector<T> = (source: Observable<T>) => Observable<T>;

export interface PublishSignature<T> {
(): ConnectableObservable<T>;
(selector: selector<T>): Observable<T>;
}