From 7b6da9029add899446482e04e38091d5098f1911 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 1 Feb 2016 16:18:03 -0800 Subject: [PATCH] fix(Observer): fix typing to allow observation via partial observables with PartialObservable - Observer is the "full" observer interface - PartialObserver is any object with at least one method of the Observer interface - Updates subscribe signature and Notification signatures to use PartialObserver instead of Observer Related #1260 --- src/Notification.ts | 22 +++++++++++----------- src/Observable.ts | 6 +++--- src/Observer.ts | 31 +++++++++++++++++++++++++++---- src/Subscriber.ts | 18 +++++++++--------- src/operator/mergeMapTo.ts | 4 ++-- src/operator/observeOn.ts | 4 ++-- src/operator/zip.ts | 4 ++-- src/util/toSubscriber.ts | 16 ++++++++-------- 8 files changed, 64 insertions(+), 41 deletions(-) diff --git a/src/Notification.ts b/src/Notification.ts index c62028a6fd..86b7c2f32f 100644 --- a/src/Notification.ts +++ b/src/Notification.ts @@ -1,4 +1,4 @@ -import {Observer} from './Observer'; +import {PartialObserver} from './Observer'; import {Observable} from './Observable'; export class Notification { @@ -8,14 +8,14 @@ export class Notification { this.hasValue = kind === 'N'; } - observe(observer: Observer): any { + observe(observer: PartialObserver): any { switch (this.kind) { case 'N': - return observer.next(this.value); + return observer.next && observer.next(this.value); case 'E': - return observer.error(this.exception); + return observer.error && observer.error(this.exception); case 'C': - return observer.complete(); + return observer.complete && observer.complete(); } } @@ -23,17 +23,17 @@ export class Notification { const kind = this.kind; switch (kind) { case 'N': - return next(this.value); + return next && next(this.value); case 'E': - return error(this.exception); + return error && error(this.exception); case 'C': - return complete(); + return complete && complete(); } } - accept(nextOrObserver: Observer | ((value: T) => void), error?: (err: any) => void, complete?: () => void) { - if (nextOrObserver && typeof (>nextOrObserver).next === 'function') { - return this.observe(>nextOrObserver); + accept(nextOrObserver: PartialObserver | ((value: T) => void), error?: (err: any) => void, complete?: () => void) { + if (nextOrObserver && typeof (>nextOrObserver).next === 'function') { + return this.observe(>nextOrObserver); } else { return this.do(<(value: T) => void>nextOrObserver, error, complete); } diff --git a/src/Observable.ts b/src/Observable.ts index cb75fe70b6..c28ad16cc8 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -1,4 +1,4 @@ -import {Observer} from './Observer'; +import {PartialObserver} from './Observer'; import {Operator} from './Operator'; import {Scheduler} from './Scheduler'; import {Subscriber} from './Subscriber'; @@ -98,7 +98,7 @@ export class Observable implements CoreOperators { /** * @method subscribe - * @param {Observer|Function} observerOrNext (optional) either an observer defining all functions to be called, + * @param {PartialObserver|Function} observerOrNext (optional) either an observer defining all functions to be called, * or the first of three possible handlers, which is the handler for each value emitted from the observable. * @param {Function} error (optional) a handler for a terminal event resulting from an error. If no error handler is provided, * the error will be thrown as unhandled @@ -107,7 +107,7 @@ export class Observable implements CoreOperators { * @description registers handlers for handling emitted values, error and completions from the observable, and * executes the observable's subscriber function, which will take action to set up the underlying data stream */ - subscribe(observerOrNext?: Observer | ((value: T) => void), + subscribe(observerOrNext?: PartialObserver | ((value: T) => void), error?: (error: any) => void, complete?: () => void): Subscription { diff --git a/src/Observer.ts b/src/Observer.ts index bf6547c3ae..2f232f6549 100644 --- a/src/Observer.ts +++ b/src/Observer.ts @@ -1,8 +1,31 @@ +export interface NextObserver { + isUnsubscribed?: boolean; + next: (value: T) => void; + error?: (err: any) => void; + complete?: () => void; +} + +export interface ErrorObserver { + isUnsubscribed?: boolean; + next?: (value: T) => void; + error: (err: any) => void; + complete?: () => void; +} + +export interface CompletionObserver { + isUnsubscribed?: boolean; + next?: (value: T) => void; + error?: (err: any) => void; + complete: () => void; +} + +export type PartialObserver = NextObserver | ErrorObserver | CompletionObserver; + export interface Observer { - isUnsubscribed: boolean; - next(value: T): void; - error(error: any): void; - complete(): void; + isUnsubscribed?: boolean; + next: (value: T) => void; + error: (err: any) => void; + complete: () => void; } export const empty: Observer = { diff --git a/src/Subscriber.ts b/src/Subscriber.ts index a231bdae73..35e72e0d29 100644 --- a/src/Subscriber.ts +++ b/src/Subscriber.ts @@ -1,5 +1,5 @@ import {isFunction} from './util/isFunction'; -import {Observer} from './Observer'; +import {Observer, PartialObserver} from './Observer'; import {Subscription} from './Subscription'; import {rxSubscriber} from './symbol/rxSubscriber'; import {empty as emptyObserver} from './Observer'; @@ -19,9 +19,9 @@ export class Subscriber extends Subscription implements Observer { public syncErrorThrowable: boolean = false; protected isStopped: boolean = false; - protected destination: Observer; + protected destination: PartialObserver; - constructor(destinationOrNext?: Observer | ((value: T) => void), + constructor(destinationOrNext?: PartialObserver | ((value: T) => void), error?: (e?: any) => void, complete?: () => void) { super(); @@ -37,10 +37,10 @@ export class Subscriber extends Subscription implements Observer { } if (typeof destinationOrNext === 'object') { if (destinationOrNext instanceof Subscriber) { - this.destination = (> destinationOrNext); + this.destination = (> destinationOrNext); } else { this.syncErrorThrowable = true; - this.destination = new SafeSubscriber(this, > destinationOrNext); + this.destination = new SafeSubscriber(this, > destinationOrNext); } break; } @@ -103,7 +103,7 @@ class SafeSubscriber extends Subscriber { private _context: any; constructor(private _parent: Subscriber, - observerOrNext?: Observer | ((value: T) => void), + observerOrNext?: PartialObserver | ((value: T) => void), error?: (e?: any) => void, complete?: () => void) { super(); @@ -115,9 +115,9 @@ class SafeSubscriber extends Subscriber { next = (<((value: T) => void)> observerOrNext); } else if (observerOrNext) { context = observerOrNext; - next = (> observerOrNext).next; - error = (> observerOrNext).error; - complete = (> observerOrNext).complete; + next = (> observerOrNext).next; + error = (> observerOrNext).error; + complete = (> observerOrNext).complete; } this._context = context; diff --git a/src/operator/mergeMapTo.ts b/src/operator/mergeMapTo.ts index da8d1c14c3..9af1464884 100644 --- a/src/operator/mergeMapTo.ts +++ b/src/operator/mergeMapTo.ts @@ -1,6 +1,6 @@ import {Observable} from '../Observable'; import {Operator} from '../Operator'; -import {Observer} from '../Observer'; +import {PartialObserver} from '../Observer'; import {Subscriber} from '../Subscriber'; import {tryCatch} from '../util/tryCatch'; import {errorObject} from '../util/errorObject'; @@ -54,7 +54,7 @@ export class MergeMapToSubscriber extends OuterSubscriber { } private _innerSub(ish: any, - destination: Observer, + destination: PartialObserver, resultSelector: (outerValue: T, innerValue: R, outerIndex: number, innerIndex: number) => R2, value: T, index: number): void { diff --git a/src/operator/observeOn.ts b/src/operator/observeOn.ts index 8ee98ae433..aca666be84 100644 --- a/src/operator/observeOn.ts +++ b/src/operator/observeOn.ts @@ -1,7 +1,7 @@ import {Observable} from '../Observable'; import {Scheduler} from '../Scheduler'; import {Operator} from '../Operator'; -import {Observer} from '../Observer'; +import {PartialObserver} from '../Observer'; import {Subscriber} from '../Subscriber'; import {Notification} from '../Notification'; @@ -50,6 +50,6 @@ export class ObserveOnSubscriber extends Subscriber { class ObserveOnMessage { constructor(public notification: Notification, - public destination: Observer) { + public destination: PartialObserver) { } } \ No newline at end of file diff --git a/src/operator/zip.ts b/src/operator/zip.ts index 425933e5a9..ef73b7c195 100644 --- a/src/operator/zip.ts +++ b/src/operator/zip.ts @@ -2,7 +2,7 @@ import {Observable} from '../Observable'; import {ArrayObservable} from '../observable/ArrayObservable'; import {isArray} from '../util/isArray'; import {Operator} from '../Operator'; -import {Observer} from '../Observer'; +import {PartialObserver} from '../Observer'; import {Subscriber} from '../Subscriber'; import {OuterSubscriber} from '../OuterSubscriber'; import {InnerSubscriber} from '../InnerSubscriber'; @@ -199,7 +199,7 @@ class ZipBufferIterator extends OuterSubscriber implements LookAhead buffer: T[] = []; isComplete = false; - constructor(destination: Observer, + constructor(destination: PartialObserver, private parent: ZipSubscriber, private observable: Observable, private index: number) { diff --git a/src/util/toSubscriber.ts b/src/util/toSubscriber.ts index 72c8316f87..a53ac78518 100644 --- a/src/util/toSubscriber.ts +++ b/src/util/toSubscriber.ts @@ -1,19 +1,19 @@ -import {Observer} from '../Observer'; +import {PartialObserver} from '../Observer'; import {Subscriber} from '../Subscriber'; import {rxSubscriber} from '../symbol/rxSubscriber'; export function toSubscriber( - next?: Observer | ((value: T) => void), + nextOrObserver?: PartialObserver | ((value: T) => void), error?: (error: any) => void, complete?: () => void): Subscriber { - if (next && typeof next === 'object') { - if (next instanceof Subscriber) { - return (> next); - } else if (typeof next[rxSubscriber] === 'function') { - return next[rxSubscriber](); + if (nextOrObserver && typeof nextOrObserver === 'object') { + if (nextOrObserver instanceof Subscriber) { + return (> nextOrObserver); + } else if (typeof nextOrObserver[rxSubscriber] === 'function') { + return nextOrObserver[rxSubscriber](); } } - return new Subscriber(next, error, complete); + return new Subscriber(nextOrObserver, error, complete); }