-
Notifications
You must be signed in to change notification settings - Fork 3k
Subscriber refactor for version 8 #6817
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7380551
4e9f2a9
af10149
be4e9cd
85aa241
0addb82
035b873
0a22d06
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
const _bind = Function.prototype.bind; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1 @@ | ||
import "tslib"; | ||
|
||
const _bind = Function.prototype.bind; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1 @@ | ||
import "tslib"; | ||
|
||
const _bind = Function.prototype.bind; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1 @@ | ||
import "tslib"; | ||
|
||
const _bind = Function.prototype.bind; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
const _bind = Function.prototype.bind; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
const _bind = Function.prototype.bind; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1 @@ | ||
import "tslib"; | ||
|
||
var _bind = Function.prototype.bind; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1 @@ | ||
import "tslib"; | ||
|
||
var _bind = Function.prototype.bind; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1 @@ | ||
import "tslib"; | ||
|
||
var _bind = Function.prototype.bind; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1 @@ | ||
import "tslib"; | ||
|
||
var _bind = Function.prototype.bind; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1 @@ | ||
import "tslib"; | ||
|
||
var _bind = Function.prototype.bind; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1 @@ | ||
import "tslib"; | ||
|
||
var _bind = Function.prototype.bind; |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,45 +18,31 @@ import { timeoutProvider } from './scheduler/timeoutProvider'; | |
* @class Subscriber<T> | ||
*/ | ||
export class Subscriber<T> extends Subscription implements Observer<T> { | ||
/** | ||
* A static factory for a Subscriber, given a (potentially partial) definition | ||
* of an Observer. | ||
* @param next The `next` callback of an Observer. | ||
* @param error The `error` callback of an | ||
* Observer. | ||
* @param complete The `complete` callback of an | ||
* Observer. | ||
* @return A Subscriber wrapping the (partially defined) | ||
* Observer represented by the given arguments. | ||
* @nocollapse | ||
* @deprecated Do not use. Will be removed in v8. There is no replacement for this | ||
* method, and there is no reason to be creating instances of `Subscriber` directly. | ||
* If you have a specific use case, please file an issue. | ||
*/ | ||
static create<T>(next?: (x?: T) => void, error?: (e?: any) => void, complete?: () => void): Subscriber<T> { | ||
return new SafeSubscriber({ next, error, complete }); | ||
} | ||
|
||
/** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */ | ||
protected isStopped: boolean = false; | ||
/** @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. */ | ||
protected destination: Subscriber<any> | Observer<any>; // this `any` is the escape hatch to erase extra type param (e.g. R) | ||
protected destination: Subscriber<T> | Observer<T>; | ||
|
||
/** | ||
* @deprecated Internal implementation detail, do not use directly. Will be made internal in v8. | ||
* There is no reason to directly create an instance of Subscriber. This type is exported for typings reasons. | ||
* Creates an instance of an RxJS Subscriber. This is the workhorse of the library. | ||
* | ||
* If another instance of Subscriber is passed in, it will automatically wire up unsubscription | ||
* between this instnace and the passed in instance. | ||
* | ||
* If a partial or full observer is passed in, it will be wrapped and appropriate safeguards will be applied. | ||
* | ||
* If a next-handler function is passed in, it will be wrapped and appropriate safeguards will be applied. | ||
* | ||
* @param destination A subscriber, partial observer, or function that receives the next value. | ||
*/ | ||
constructor(destination?: Subscriber<any> | Observer<any>) { | ||
constructor(destination?: Subscriber<T> | Partial<Observer<T>> | ((value: T) => void) | null) { | ||
super(); | ||
if (destination) { | ||
this.destination = destination; | ||
// Automatically chain subscriptions together here. | ||
// if destination is a Subscription, then it is a Subscriber. | ||
if (isSubscription(destination)) { | ||
destination.add(this); | ||
} | ||
} else { | ||
this.destination = EMPTY_OBSERVER; | ||
this.destination = isSubscriber(destination) ? destination : createSafeObserver(destination); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. important magic here! If we have a If you step through the commits, you find out that this is all |
||
|
||
// Automatically chain subscriptions together here. | ||
// if destination is a Subscription, then it is a Subscriber. | ||
if (isSubscription(destination)) { | ||
destination.add(this); | ||
} | ||
} | ||
|
||
|
@@ -135,21 +121,6 @@ export class Subscriber<T> extends Subscription implements Observer<T> { | |
} | ||
} | ||
|
||
/** | ||
* This bind is captured here because we want to be able to have | ||
* compatibility with monoid libraries that tend to use a method named | ||
* `bind`. In particular, a library called Monio requires this. | ||
*/ | ||
const _bind = Function.prototype.bind; | ||
|
||
function bind<Fn extends (...args: any[]) => any>(fn: Fn, thisArg: any): Fn { | ||
return _bind.call(fn, thisArg); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The bind stuff can be removed! Yay. |
||
|
||
/** | ||
* Internal optimization only, DO NOT EXPOSE. | ||
* @internal | ||
*/ | ||
class ConsumerObserver<T> implements Observer<T> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is from #6815 |
||
constructor(private partialObserver: Partial<Observer<T>>) {} | ||
|
||
|
@@ -189,26 +160,8 @@ class ConsumerObserver<T> implements Observer<T> { | |
} | ||
} | ||
|
||
export class SafeSubscriber<T> extends Subscriber<T> { | ||
constructor(observerOrNext?: Partial<Observer<T>> | ((value: T) => void) | null) { | ||
super(); | ||
|
||
let partialObserver: Partial<Observer<T>>; | ||
if (isFunction(observerOrNext) || !observerOrNext) { | ||
// The first argument is a function, not an observer. The next | ||
// two arguments *could* be observers, or they could be empty. | ||
partialObserver = { | ||
next: observerOrNext ?? undefined, | ||
}; | ||
} else { | ||
// The "normal" path. Just use the partial observer directly. | ||
partialObserver = observerOrNext; | ||
} | ||
|
||
// Wrap the partial observer to ensure it's a full observer, and | ||
// make sure proper error handling is accounted for. | ||
this.destination = new ConsumerObserver(partialObserver); | ||
} | ||
function createSafeObserver<T>(observerOrNext?: Partial<Observer<T>> | ((value: T) => void) | null): Observer<T> { | ||
return new ConsumerObserver(!observerOrNext || isFunction(observerOrNext) ? { next: observerOrNext ?? undefined } : observerOrNext); | ||
} | ||
|
||
/** | ||
|
@@ -242,3 +195,11 @@ export const EMPTY_OBSERVER: Readonly<Observer<any>> & { closed: true } = { | |
error: defaultErrorHandler, | ||
complete: noop, | ||
}; | ||
|
||
function isObserver<T>(value: any): value is Observer<T> { | ||
return value && isFunction(value.next) && isFunction(value.error) && isFunction(value.complete); | ||
} | ||
|
||
export function isSubscriber<T>(value: any): value is Subscriber<T> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These were just moved. |
||
return (value && value instanceof Subscriber) || (isObserver(value) && isSubscription(value)); | ||
} |
Uh oh!
There was an error while loading. Please reload this page.