diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index a129b1b6f4b75d..8a7567ffc2fef8 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -115,6 +115,7 @@ const kHasFlowing = 1 << 23; const kFlowing = 1 << 24; const kHasPaused = 1 << 25; const kPaused = 1 << 26; +const kDataListening = 1 << 27; // TODO(benjamingr) it is likely slower to do it this way than with free functions function makeBitMapDescriptor(bit) { @@ -531,8 +532,7 @@ function canPushMore(state) { } function addChunk(stream, state, chunk, addToFront) { - if ((state[kState] & (kFlowing | kSync)) === kFlowing && state.length === 0 && - stream.listenerCount('data') > 0) { + if ((state[kState] & (kFlowing | kSync | kDataListening)) === (kFlowing | kDataListening) && state.length === 0) { // Use the guard to avoid creating `Set()` repeatedly // when we have multiple pipes. if ((state[kState] & kMultiAwaitDrain) !== 0) { @@ -1058,7 +1058,7 @@ function pipeOnDrain(src, dest) { } if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) && - src.listenerCount('data')) { + (state[kState] & kDataListening) !== 0) { src.resume(); } }; @@ -1105,6 +1105,8 @@ Readable.prototype.on = function(ev, fn) { const state = this._readableState; if (ev === 'data') { + state[kState] |= kDataListening; + // Update readableListening so that resume() may be a no-op // a few lines down. This is needed to support once('readable'). state.readableListening = this.listenerCount('readable') > 0; @@ -1131,6 +1133,8 @@ Readable.prototype.on = function(ev, fn) { Readable.prototype.addListener = Readable.prototype.on; Readable.prototype.removeListener = function(ev, fn) { + const state = this._readableState; + const res = Stream.prototype.removeListener.call(this, ev, fn); @@ -1142,6 +1146,8 @@ Readable.prototype.removeListener = function(ev, fn) { // resume within the same tick will have no // effect. process.nextTick(updateReadableListening, this); + } else if (ev === 'data' && this.listenerCount('data') === 0) { + state[kState] &= ~kDataListening; } return res; @@ -1175,7 +1181,7 @@ function updateReadableListening(self) { state[kState] |= kHasFlowing | kFlowing; // Crude way to check if we should resume. - } else if (self.listenerCount('data') > 0) { + } else if ((state[kState] & kDataListening) !== 0) { self.resume(); } else if ((state[kState] & kReadableListening) === 0) { state[kState] &= ~(kHasFlowing | kFlowing);