-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support optional selector for publish / multicast #1659
Changes from all commits
05d47de
b3a27e9
56f4989
b6fca2d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
var RxOld = require('rx'); | ||
var RxNew = require('../../../../index'); | ||
|
||
module.exports = function (suite) { | ||
var oldMulticastWithImmediateScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate) | ||
.multicast(function () { | ||
return new RxOld.Subject(); | ||
}, function (x) { | ||
return x; | ||
}); | ||
var newMulticastWithImmediateScheduler = RxNew.Observable.range(0, 25) | ||
.multicast(function () { | ||
return new RxNew.Subject(); | ||
}, function (x) { | ||
return x; | ||
}); | ||
|
||
function _next(x) { } | ||
function _error(e) { } | ||
function _complete() { } | ||
return suite | ||
.add('old multicast with selector and immediate scheduler', function () { | ||
oldMulticastWithImmediateScheduler.subscribe(_next, _error, _complete); | ||
}) | ||
.add('new multicast with selector and immediate scheduler', function () { | ||
newMulticastWithImmediateScheduler.subscribe(_next, _error, _complete); | ||
}); | ||
}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
var RxOld = require('rx'); | ||
var RxNew = require('../../../../index'); | ||
|
||
module.exports = function (suite) { | ||
var oldSubject = new RxOld.Subject(); | ||
var newSubject = new RxNew.Subject(); | ||
|
||
var oldMulticastWithImmediateScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate) | ||
.multicast(oldSubject); | ||
var newMulticastWithImmediateScheduler = RxNew.Observable.range(0, 25) | ||
.multicast(newSubject); | ||
|
||
function _next(x) { } | ||
function _error(e) { } | ||
function _complete() { } | ||
|
||
oldMulticastWithImmediateScheduler.connect(); | ||
newMulticastWithImmediateScheduler.connect(); | ||
|
||
return suite | ||
.add('old multicast with immediate scheduler', function () { | ||
oldMulticastWithImmediateScheduler.subscribe(_next, _error, _complete); | ||
}) | ||
.add('new multicast with immediate scheduler', function () { | ||
newMulticastWithImmediateScheduler.subscribe(_next, _error, _complete); | ||
}); | ||
}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
var RxOld = require('rx'); | ||
var RxNew = require('../../../../index'); | ||
|
||
module.exports = function (suite) { | ||
var oldPublishWithImmediateScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate) | ||
.publish(function (x) { | ||
return x; | ||
}); | ||
var newPublishWithImmediateScheduler = RxNew.Observable.range(0, 25) | ||
.publish(function (x) { | ||
return x; | ||
}); | ||
|
||
function _next(x) { } | ||
function _error(e) { } | ||
function _complete() { } | ||
return suite | ||
.add('old publish with selector and immediate scheduler', function () { | ||
oldPublishWithImmediateScheduler.subscribe(_next, _error, _complete); | ||
}) | ||
.add('new publish with selector and immediate scheduler', function () { | ||
newPublishWithImmediateScheduler.subscribe(_next, _error, _complete); | ||
}); | ||
}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
var RxOld = require('rx'); | ||
var RxNew = require('../../../../index'); | ||
|
||
module.exports = function (suite) { | ||
var oldPublishWithImmediateScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate).publish(); | ||
var newPublishWithImmediateScheduler = RxNew.Observable.range(0, 25).publish(); | ||
|
||
function _next(x) { } | ||
function _error(e) { } | ||
function _complete() { } | ||
|
||
oldPublishWithImmediateScheduler.connect(); | ||
newPublishWithImmediateScheduler.connect(); | ||
|
||
return suite | ||
.add('old publish with immediate scheduler', function () { | ||
oldPublishWithImmediateScheduler.subscribe(_next, _error, _complete); | ||
}) | ||
.add('new publish with immediate scheduler', function () { | ||
newPublishWithImmediateScheduler.subscribe(_next, _error, _complete); | ||
}); | ||
}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
import {Observable} from '../Observable'; | ||
import {Subscriber} from '../Subscriber'; | ||
import {Subscription} from '../Subscription'; | ||
import {ConnectableObservable} from '../observable/ConnectableObservable'; | ||
|
||
export class MulticastObservable<T> extends Observable<T> { | ||
constructor(protected source: Observable<T>, | ||
private connectable: ConnectableObservable<T>, | ||
private selector: (source: Observable<T>) => Observable<T>) { | ||
super(); | ||
} | ||
|
||
protected _subscribe(subscriber: Subscriber<T>): Subscription { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @kwonoj shouldn't this really be done with an operator? Why introduce another Observable type? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't have strong reasoning behind this, actually. So suggest to remove this observable type? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, it'll make the ConnectableObservable changes I'm working on easier to integrate. |
||
const {selector, connectable} = this; | ||
|
||
const subscription = selector(connectable).subscribe(subscriber); | ||
subscription.add(connectable.connect()); | ||
return subscription; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,6 @@ | ||
import {Subject} from '../Subject'; | ||
import {Observable} from '../Observable'; | ||
import {MulticastObservable} from '../observable/MulticastObservable'; | ||
import {ConnectableObservable} from '../observable/ConnectableObservable'; | ||
|
||
/** | ||
|
@@ -7,7 +9,10 @@ import {ConnectableObservable} from '../observable/ConnectableObservable'; | |
* | ||
* <img src="./img/multicast.png" width="100%"> | ||
* | ||
* @param {Function} selector - a function that can use the multicasted source stream | ||
* @param {Function|Subject} Factory function to create an intermediate subject through | ||
* which the source sequence's elements will be multicast to the selector function | ||
* or Subject to push source elements into. | ||
* @param {Function} Optional selector function that can use the multicasted source stream | ||
* as many times as needed, without causing multiple subscriptions to the source stream. | ||
* Subscribers to the given source will receive all notifications of the source from the | ||
* time of the subscription forward. | ||
|
@@ -17,7 +22,8 @@ import {ConnectableObservable} from '../observable/ConnectableObservable'; | |
* @method multicast | ||
* @owner Observable | ||
*/ | ||
export function multicast<T>(subjectOrSubjectFactory: Subject<T> | (() => Subject<T>)): ConnectableObservable<T> { | ||
export function multicast<T>(subjectOrSubjectFactory: Subject<T> | (() => Subject<T>), | ||
selector?: (source: Observable<T>) => Observable<T>): Observable<T> | ConnectableObservable<T> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not in love with the name There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. me too. followed RxJS 4 convention, since wasn't able to come up with better name. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
let subjectFactory: () => Subject<T>; | ||
if (typeof subjectOrSubjectFactory === 'function') { | ||
subjectFactory = <() => Subject<T>>subjectOrSubjectFactory; | ||
|
@@ -26,11 +32,15 @@ export function multicast<T>(subjectOrSubjectFactory: Subject<T> | (() => Subjec | |
return <Subject<T>>subjectOrSubjectFactory; | ||
}; | ||
} | ||
return new ConnectableObservable(this, subjectFactory); | ||
|
||
const connectable = new ConnectableObservable(this, subjectFactory); | ||
return selector ? new MulticastObservable(this, connectable, selector) : connectable; | ||
} | ||
|
||
export type factoryOrValue<T> = T | (() => T); | ||
export type selector<T> = (source: Observable<T>) => Observable<T>; | ||
|
||
export interface MulticastSignature<T> { | ||
(subjectOrSubjectFactory: factoryOrValue<Subject<T>>): ConnectableObservable<T>; | ||
(SubjectFactory: () => Subject<T>, selector?: selector<T>): Observable<T>; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a smell. Why do we need to do this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To support different signature
multicast(subject)
/multicast(factory, selector?)
.As similar to other operators have different signatures
mergeMapTo
/add/mergeMapTo
if there's completely different two signatures, can't be assigned into prototype directly since function definition is single, polymorphic.