diff --git a/spec/helpers/doNotUnsubscribe.ts b/spec/helpers/doNotUnsubscribe.ts new file mode 100644 index 0000000000..22752739dc --- /dev/null +++ b/spec/helpers/doNotUnsubscribe.ts @@ -0,0 +1,16 @@ +/// +import * as Rx from '../../dist/cjs/Rx'; + +export function doNotUnsubscribe(ob: Rx.Observable): Rx.Observable { + return ob.lift(new DoNotUnsubscribeOperator()); +} + +class DoNotUnsubscribeOperator implements Rx.Operator { + call(subscriber: Rx.Subscriber, source: any): any { + return source.subscribe(new DoNotUnsubscribeSubscriber(subscriber)); + } +} + +class DoNotUnsubscribeSubscriber extends Rx.Subscriber { + unsubscribe() {} // tslint:disable-line no-empty +} \ No newline at end of file diff --git a/spec/operators/elementAt-spec.ts b/spec/operators/elementAt-spec.ts index ac27b5cc85..5d0c7a8a74 100644 --- a/spec/operators/elementAt-spec.ts +++ b/spec/operators/elementAt-spec.ts @@ -1,6 +1,7 @@ import {expect} from 'chai'; import * as Rx from '../../dist/cjs/Rx'; import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports +import { doNotUnsubscribe } from '../helpers/doNotUnsubscribe'; declare const { asDiagram }; declare const hot: typeof marbleTestingSignature.hot; @@ -126,4 +127,33 @@ describe('Observable.prototype.elementAt', () => { expectObservable((source).elementAt(3, defaultValue)).toBe(expected, { x: defaultValue }); expectSubscriptions(source.subscriptions).toBe(subs); }); + + it('should unsubscribe from source Observable, even if destination does not unsubscribe', () => { + const source = hot('--a--b--c-d---|'); + const subs = '^ ! '; + const expected = '--------(c|) '; + + expectObservable((source).elementAt(2).let(doNotUnsubscribe)).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + + it('should unsubscribe from source if index is out of range, even if destination does not unsubscribe', () => { + const source = hot('--a--|'); + const subs = '^ !'; + const expected = '-----#'; + + expectObservable((source).elementAt(3).let(doNotUnsubscribe)) + .toBe(expected, null, new Rx.ArgumentOutOfRangeError()); + expectSubscriptions(source.subscriptions).toBe(subs); + }); + + it('should unsubscribe when returning default value, even if destination does not unsubscribe', () => { + const source = hot('--a--|'); + const subs = '^ !'; + const expected = '-----(x|)'; + const defaultValue = '42'; + + expectObservable((source).elementAt(3, defaultValue).let(doNotUnsubscribe)).toBe(expected, { x: defaultValue }); + expectSubscriptions(source.subscriptions).toBe(subs); + }); }); diff --git a/src/operator/elementAt.ts b/src/operator/elementAt.ts index 65561e0890..a64fb12fde 100644 --- a/src/operator/elementAt.ts +++ b/src/operator/elementAt.ts @@ -78,6 +78,7 @@ class ElementAtSubscriber extends Subscriber { if (this.index-- === 0) { this.destination.next(x); this.destination.complete(); + this.unsubscribe(); } } @@ -88,8 +89,11 @@ class ElementAtSubscriber extends Subscriber { destination.next(this.defaultValue); } else { destination.error(new ArgumentOutOfRangeError); + this.unsubscribe(); + return; } } destination.complete(); + this.unsubscribe(); } }