From 1b30aa91ae801af64670ed23000f215c309c55cd Mon Sep 17 00:00:00 2001 From: kwonoj Date: Thu, 15 Oct 2015 02:23:55 -0700 Subject: [PATCH] fix(buffer): cleanup notifier subscription when unsubscribed --- spec/operators/buffer-spec.js | 21 +++++++++++++++++++-- src/operators/buffer.ts | 10 ++++++++-- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/spec/operators/buffer-spec.js b/spec/operators/buffer-spec.js index b662891538..521db6b43a 100644 --- a/spec/operators/buffer-spec.js +++ b/spec/operators/buffer-spec.js @@ -1,8 +1,8 @@ -/* globals describe, it, expect, expectObservable, hot */ +/* globals describe, it, expect, expectObservable, expectSubscriptions, hot */ var Rx = require('../../dist/cjs/Rx'); var Observable = Rx.Observable; -describe('Observable.prototype.buffer', function () { +describe('Observable.prototype.buffer()', function () { it('should work with empty and empty selector', function () { var a = Observable.empty(); var b = Observable.empty(); @@ -167,4 +167,21 @@ describe('Observable.prototype.buffer', function () { expectObservable(a.buffer(b)).toBe(expected, expectedValues, new Error('too bad')); expectSubscriptions(a.subscriptions).toBe(subs); }); + + it('should unsubscribe notifier when source unsubscribed', function () { + var a = hot('--1--2--^--3--4--5---6----7--8--9---0---|'); + var unsub = ' ! '; + var subs = '^ ! '; + var b = hot('--------^--a-------b---cd| '); + var bsubs = '^ ! '; + var expected = '---a-------b--- '; + var expectedValues = { + a: ['3'], + b: ['4', '5'] + }; + + expectObservable(a.buffer(b), unsub).toBe(expected, expectedValues); + expectSubscriptions(a.subscriptions).toBe(subs); + expectSubscriptions(b.subscriptions).toBe(bsubs); + }); }); diff --git a/src/operators/buffer.ts b/src/operators/buffer.ts index e66ccd2830..934cad9dc4 100644 --- a/src/operators/buffer.ts +++ b/src/operators/buffer.ts @@ -30,11 +30,13 @@ class BufferOperator implements Operator { } class BufferSubscriber extends Subscriber { - buffer: T[] = []; + private buffer: T[] = []; + private notifierSubscriber: BufferClosingNotifierSubscriber = null; constructor(destination: Subscriber, closingNotifier: Observable) { super(destination); - this.add(closingNotifier._subscribe(new BufferClosingNotifierSubscriber(this))); + this.notifierSubscriber = new BufferClosingNotifierSubscriber(this); + this.add(closingNotifier._subscribe(this.notifierSubscriber)); } _next(value: T) { @@ -53,6 +55,10 @@ class BufferSubscriber extends Subscriber { const buffer = this.buffer; this.buffer = []; this.destination.next(buffer); + + if (this.isUnsubscribed) { + this.notifierSubscriber.unsubscribe(); + } } }