diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 1c1bce6cf2c376..d3228340b4bca9 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -510,20 +510,12 @@ function onEofChunk(stream, state) { } } state.ended = true; + state.needReadable = false; - if (state.sync) { - // If we are sync, wait until next tick to emit the data. - // Otherwise we risk emitting data in the flow() - // the readable code triggers during a read() call - emitReadable(stream); - } else { - // Emit 'readable' now to make sure it gets picked up. - state.needReadable = false; - if (!state.emittedReadable) { - state.emittedReadable = true; - emitReadable_(stream); - } - } + // We are not protecting if emittedReadable = true, + // so 'readable' gets scheduled anyway. + state.emittedReadable = true; + process.nextTick(emitReadable_, stream); } // Don't emit readable right away in sync mode, because this can trigger diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 0b4f845e4c1503..7393dddbabc329 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -116,9 +116,6 @@ function Transform(options) { writeencoding: null }; - // Start out asking for a readable event once data is transformed. - this._readableState.needReadable = true; - // We have implemented the _read method, and done the other things // that Readable wants before the first _read call, so unset the // sync guard flag. diff --git a/test/parallel/test-http-readable-data-event.js b/test/parallel/test-http-readable-data-event.js index ddaeca5fe7103c..21b1fa65c661c8 100644 --- a/test/parallel/test-http-readable-data-event.js +++ b/test/parallel/test-http-readable-data-event.js @@ -29,7 +29,7 @@ const server = http.createServer((req, res) => { }; const expectedData = [helloWorld, helloAgainLater]; - const expectedRead = [helloWorld, null, helloAgainLater, null]; + const expectedRead = [helloWorld, null, helloAgainLater, null, null]; const req = http.request(opts, (res) => { res.on('error', common.mustNotCall()); @@ -42,7 +42,7 @@ const server = http.createServer((req, res) => { assert.strictEqual(data, expectedRead.shift()); next(); } while (data !== null); - }, 2)); + }, 3)); res.setEncoding('utf8'); res.on('data', common.mustCall((data) => { diff --git a/test/parallel/test-stream-readable-emit-readable-short-stream.js b/test/parallel/test-stream-readable-emit-readable-short-stream.js new file mode 100644 index 00000000000000..2f4f43baf5a848 --- /dev/null +++ b/test/parallel/test-stream-readable-emit-readable-short-stream.js @@ -0,0 +1,146 @@ +'use strict'; + +const common = require('../common'); +const stream = require('stream'); +const assert = require('assert'); + +{ + const r = new stream.Readable({ + read: common.mustCall(function() { + this.push('content'); + this.push(null); + }) + }); + + const t = new stream.Transform({ + transform: common.mustCall(function(chunk, encoding, callback) { + this.push(chunk); + return callback(); + }), + flush: common.mustCall(function(callback) { + return callback(); + }) + }); + + r.pipe(t); + t.on('readable', common.mustCall(function() { + while (true) { + const chunk = t.read(); + if (!chunk) + break; + + assert.strictEqual(chunk.toString(), 'content'); + } + }, 2)); +} + +{ + const t = new stream.Transform({ + transform: common.mustCall(function(chunk, encoding, callback) { + this.push(chunk); + return callback(); + }), + flush: common.mustCall(function(callback) { + return callback(); + }) + }); + + t.end('content'); + + t.on('readable', common.mustCall(function() { + while (true) { + const chunk = t.read(); + if (!chunk) + break; + assert.strictEqual(chunk.toString(), 'content'); + } + }, 2)); +} + +{ + const t = new stream.Transform({ + transform: common.mustCall(function(chunk, encoding, callback) { + this.push(chunk); + return callback(); + }), + flush: common.mustCall(function(callback) { + return callback(); + }) + }); + + t.write('content'); + t.end(); + + t.on('readable', common.mustCall(function() { + while (true) { + const chunk = t.read(); + if (!chunk) + break; + assert.strictEqual(chunk.toString(), 'content'); + } + }, 2)); +} + +{ + const t = new stream.Readable({ + read() { + } + }); + + t.on('readable', common.mustCall(function() { + while (true) { + const chunk = t.read(); + if (!chunk) + break; + assert.strictEqual(chunk.toString(), 'content'); + } + }, 2)); + + t.push('content'); + t.push(null); +} + +{ + const t = new stream.Readable({ + read() { + } + }); + + t.on('readable', common.mustCall(function() { + while (true) { + const chunk = t.read(); + if (!chunk) + break; + assert.strictEqual(chunk.toString(), 'content'); + } + }, 2)); + + process.nextTick(() => { + t.push('content'); + t.push(null); + }); +} + +{ + const t = new stream.Transform({ + transform: common.mustCall(function(chunk, encoding, callback) { + this.push(chunk); + return callback(); + }), + flush: common.mustCall(function(callback) { + return callback(); + }) + }); + + t.on('readable', common.mustCall(function() { + while (true) { + const chunk = t.read(); + if (!chunk) + break; + assert.strictEqual(chunk.toString(), 'content'); + } + }, 2)); + + t.write('content'); + t.end(); +} diff --git a/test/parallel/test-stream-readable-emittedReadable.js b/test/parallel/test-stream-readable-emittedReadable.js index ba613f9e9ff19d..6580c36303e11e 100644 --- a/test/parallel/test-stream-readable-emittedReadable.js +++ b/test/parallel/test-stream-readable-emittedReadable.js @@ -43,12 +43,23 @@ const noRead = new Readable({ read: () => {} }); -noRead.on('readable', common.mustCall(() => { +noRead.once('readable', common.mustCall(() => { // emittedReadable should be true when the readable event is emitted assert.strictEqual(noRead._readableState.emittedReadable, true); noRead.read(0); // emittedReadable is not reset during read(0) assert.strictEqual(noRead._readableState.emittedReadable, true); + + noRead.on('readable', common.mustCall(() => { + // The second 'readable' is emitted because we are ending + + // emittedReadable should be true when the readable event is emitted + assert.strictEqual(noRead._readableState.emittedReadable, false); + noRead.read(0); + // emittedReadable is not reset during read(0) + assert.strictEqual(noRead._readableState.emittedReadable, false); + + })); })); noRead.push('foo'); diff --git a/test/parallel/test-stream-readable-needReadable.js b/test/parallel/test-stream-readable-needReadable.js index 7058e123f07823..54618b5e8ab14c 100644 --- a/test/parallel/test-stream-readable-needReadable.js +++ b/test/parallel/test-stream-readable-needReadable.js @@ -14,7 +14,7 @@ readable.on('readable', common.mustCall(() => { // When the readable event fires, needReadable is reset. assert.strictEqual(readable._readableState.needReadable, false); readable.read(); -})); +}, 2)); // If a readable listener is attached, then a readable event is needed. assert.strictEqual(readable._readableState.needReadable, true); @@ -74,12 +74,14 @@ const slowProducer = new Readable({ }); slowProducer.on('readable', common.mustCall(() => { - if (slowProducer.read(8) === null) { + const chunk = slowProducer.read(8); + const state = slowProducer._readableState; + if (chunk === null) { // The buffer doesn't have enough data, and the stream is not need, // we need to notify the reader when data arrives. - assert.strictEqual(slowProducer._readableState.needReadable, true); + assert.strictEqual(state.needReadable, true); } else { - assert.strictEqual(slowProducer._readableState.needReadable, false); + assert.strictEqual(state.needReadable, false); } }, 4)); diff --git a/test/parallel/test-stream-readable-reading-readingMore.js b/test/parallel/test-stream-readable-reading-readingMore.js index f5643205da0596..e72159d1c9be94 100644 --- a/test/parallel/test-stream-readable-reading-readingMore.js +++ b/test/parallel/test-stream-readable-reading-readingMore.js @@ -31,7 +31,7 @@ const Readable = require('stream').Readable; assert.strictEqual(state.reading, false); } - const expectedReadingMore = [true, false]; + const expectedReadingMore = [true, false, false]; readable.on('readable', common.mustCall(() => { // There is only one readingMore scheduled from on('data'), // after which everything is governed by the .read() call @@ -40,10 +40,12 @@ const Readable = require('stream').Readable; // If the stream has ended, we shouldn't be reading assert.strictEqual(state.ended, !state.reading); - const data = readable.read(); - if (data === null) // reached end of stream + // consume all the data + while (readable.read() !== null) {} + + if (expectedReadingMore.length === 0) // reached end of stream process.nextTick(common.mustCall(onStreamEnd, 1)); - }, 2)); + }, 3)); readable.on('end', common.mustCall(onStreamEnd)); readable.push('pushed'); diff --git a/test/parallel/test-stream2-httpclient-response-end.js b/test/parallel/test-stream2-httpclient-response-end.js index 27d31e50a96a7e..8b2920668cd703 100644 --- a/test/parallel/test-stream2-httpclient-response-end.js +++ b/test/parallel/test-stream2-httpclient-response-end.js @@ -11,8 +11,11 @@ const server = http.createServer(function(req, res) { let data = ''; res.on('readable', common.mustCall(function() { console.log('readable event'); - data += res.read(); - })); + let chunk; + while ((chunk = res.read()) !== null) { + data += chunk; + } + }, 2)); res.on('end', common.mustCall(function() { console.log('end event'); assert.strictEqual(msg, data); diff --git a/test/parallel/test-stream2-transform.js b/test/parallel/test-stream2-transform.js index cd6deaabf1147e..2590d5192fe103 100644 --- a/test/parallel/test-stream2-transform.js +++ b/test/parallel/test-stream2-transform.js @@ -321,11 +321,16 @@ const Transform = require('_stream_transform'); pt.end(); - assert.strictEqual(emits, 1); - assert.strictEqual(pt.read(5).toString(), 'l'); - assert.strictEqual(pt.read(5), null); + // The next readable is emitted on the next tick. + assert.strictEqual(emits, 0); - assert.strictEqual(emits, 1); + process.on('nextTick', function() { + assert.strictEqual(emits, 1); + assert.strictEqual(pt.read(5).toString(), 'l'); + assert.strictEqual(pt.read(5), null); + + assert.strictEqual(emits, 1); + }); } {