Skip to content

Commit a6df26d

Browse files
committed
fix(Observable): Fix Observable.subscribe to add operator TeardownLogic to returned Subscription.
1 parent be19ce9 commit a6df26d

File tree

3 files changed

+35
-7
lines changed

3 files changed

+35
-7
lines changed

spec/Observable-spec.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ import { Observer, TeardownLogic } from '../src/internal/types';
55
import { cold, expectObservable, expectSubscriptions } from './helpers/marble-testing';
66
import { map } from '../src/internal/operators/map';
77
import { noop } from '../src/internal/util/noop';
8+
import { NEVER } from '../src/internal/observable/never';
9+
import { Subscriber } from '../src/internal/Subscriber';
10+
import { Operator } from '../src/internal/Operator';
811

912
declare const asDiagram: any, rxTestScheduler: any;
1013
const Observable = Rx.Observable;
@@ -697,6 +700,24 @@ describe('Observable.lift', () => {
697700
}
698701
}
699702

703+
it('should return Observable which calls TeardownLogic of operator on unsubscription', (done) => {
704+
705+
const myOperator: Operator<any, any> = {
706+
call: (subscriber: Subscriber<any>, source: any) => {
707+
const subscription = source.subscribe((x: any) => subscriber.next(x));
708+
return () => {
709+
subscription.unsubscribe();
710+
done();
711+
};
712+
}
713+
};
714+
715+
NEVER.lift(myOperator)
716+
.subscribe()
717+
.unsubscribe();
718+
719+
});
720+
700721
it('should be overrideable in a custom Observable type that composes', (done) => {
701722
const result = new MyCustomObservable<number>((observer) => {
702723
observer.next(1);

src/internal/Observable.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ export class Observable<T> implements Subscribable<T> {
204204
const sink = toSubscriber(observerOrNext, error, complete);
205205

206206
if (operator) {
207-
operator.call(sink, this.source);
207+
sink.add(operator.call(sink, this.source));
208208
} else {
209209
sink.add(
210210
this.source || (config.useDeprecatedSynchronousErrorHandling && !sink.syncErrorThrowable) ?

src/internal/Subscription.ts

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,9 @@ export class Subscription implements SubscriptionLike {
174174

175175
const subscriptions = this._subscriptions || (this._subscriptions = []);
176176

177-
subscriptions.push(subscription);
178-
subscription._addParent(this);
177+
if (subscription._addParent(this)) {
178+
subscriptions.push(subscription);
179+
}
179180

180181
return subscription;
181182
}
@@ -197,20 +198,26 @@ export class Subscription implements SubscriptionLike {
197198
}
198199

199200
/** @internal */
200-
private _addParent(parent: Subscription) {
201+
private _addParent(parent: Subscription): boolean {
201202
let { _parent, _parents } = this;
202-
if (!_parent || _parent === parent) {
203-
// If we don't have a parent, or the new parent is the same as the
204-
// current parent, then set this._parent to the new parent.
203+
if (_parent === parent) {
204+
// If the new parent is the same as the current parent, then do nothing.
205+
return false;
206+
} else if (!_parent) {
207+
// If we don't have a parent, then set this._parent to the new parent.
205208
this._parent = parent;
209+
return true;
206210
} else if (!_parents) {
207211
// If there's already one parent, but not multiple, allocate an Array to
208212
// store the rest of the parent Subscriptions.
209213
this._parents = [parent];
214+
return true;
210215
} else if (_parents.indexOf(parent) === -1) {
211216
// Only add the new parent to the _parents list if it's not already there.
212217
_parents.push(parent);
218+
return true;
213219
}
220+
return false;
214221
}
215222
}
216223

0 commit comments

Comments
 (0)