diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 869a5c8ce9a5be..7f5e615a7a88d5 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -282,8 +282,13 @@ function onSessionRead(nread, buf, handle) { 'report this as a bug in Node.js'); _unrefActive(owner); // Reset the session timeout timer _unrefActive(stream); // Reset the stream timeout timer - if (nread >= 0 && !stream.destroyed) - return stream.push(buf); + if (nread >= 0 && !stream.destroyed) { + // prevent overflowing the buffer while pause figures out the + // stream needs to actually pause and streamOnPause runs + if (!stream.push(buf)) + owner[kHandle].streamReadStop(id); + return; + } // Last chunk was received. End the readable side. stream.push(null); @@ -1276,8 +1281,6 @@ function onStreamClosed(code) { } function streamOnResume() { - if (this._paused) - return this.pause(); if (this[kID] === undefined) { this.once('ready', streamOnResume); return; @@ -1299,12 +1302,10 @@ function streamOnPause() { } } -function streamOnDrain() { - const needPause = 0 > this._writableState.highWaterMark; - if (this._paused && !needPause) { - this._paused = false; - this.resume(); - } +function handleFlushData(handle, streamID) { + assert(handle.flushData(streamID) === undefined, + `HTTP/2 Stream ${streamID} does not exist. Please report this as ` + + 'a bug in Node.js'); } function streamOnSessionConnect() { @@ -1357,7 +1358,6 @@ class Http2Stream extends Duplex { this.once('finish', onHandleFinish); this.on('resume', streamOnResume); this.on('pause', streamOnPause); - this.on('drain', streamOnDrain); session.once('close', state.closeHandler); if (session[kState].connecting) { @@ -1507,9 +1507,7 @@ class Http2Stream extends Duplex { return; } _unrefActive(this); - assert(this[kSession][kHandle].flushData(this[kID]) === undefined, - 'HTTP/2 Stream #{this[kID]} does not exist. Please report this as ' + - 'a bug in Node.js'); + process.nextTick(handleFlushData, this[kSession][kHandle], this[kID]); } // Submits an RST-STREAM frame to shutdown this stream. diff --git a/src/node_http2.cc b/src/node_http2.cc index ec65e0052f0d7a..d12b9ca000de39 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -756,7 +756,7 @@ void Http2Session::FlushData(const FunctionCallbackInfo& args) { if (!(stream = session->FindStream(id))) { return args.GetReturnValue().Set(NGHTTP2_ERR_INVALID_STREAM_ID); } - stream->FlushDataChunks(); + stream->ReadResume(); } void Http2Session::UpdateChunksSent(const FunctionCallbackInfo& args) { diff --git a/src/node_http2_core-inl.h b/src/node_http2_core-inl.h index 5f6016a510e136..67bacfbbf07842 100644 --- a/src/node_http2_core-inl.h +++ b/src/node_http2_core-inl.h @@ -510,7 +510,7 @@ inline void Nghttp2Session::SendPendingData() { // the proceed with the rest. while (srcRemaining > destRemaining) { DEBUG_HTTP2("Nghttp2Session %s: pushing %d bytes to the socket\n", - TypeName(), destRemaining); + TypeName(), destLength + destRemaining); memcpy(dest.base + destOffset, src + srcOffset, destRemaining); destLength += destRemaining; Send(&dest, destLength); @@ -896,6 +896,14 @@ inline void Nghttp2Stream::ReadStart() { FlushDataChunks(); } +inline void Nghttp2Stream::ReadResume() { + DEBUG_HTTP2("Nghttp2Stream %d: resume reading\n", id_); + flags_ &= ~NGHTTP2_STREAM_FLAG_READ_PAUSED; + + // Flush any queued data chunks immediately out to the JS layer + FlushDataChunks(); +} + inline void Nghttp2Stream::ReadStop() { DEBUG_HTTP2("Nghttp2Stream %d: stop reading\n", id_); if (!IsReading()) diff --git a/src/node_http2_core.h b/src/node_http2_core.h index a7808ea0492256..91c58e78bb57ca 100644 --- a/src/node_http2_core.h +++ b/src/node_http2_core.h @@ -384,6 +384,9 @@ class Nghttp2Stream { // the session to be emitted at the JS side inline void ReadStart(); + // Resume Reading + inline void ReadResume(); + // Stop/Pause Reading. inline void ReadStop(); diff --git a/test/parallel/parallel.status b/test/parallel/parallel.status index 2297aad9c88edf..2958fad4d0f8fa 100644 --- a/test/parallel/parallel.status +++ b/test/parallel/parallel.status @@ -18,5 +18,7 @@ test-npm-install: PASS,FLAKY [$system==solaris] # Also applies to SmartOS [$system==freebsd] +test-http2-compat-serverrequest-pipe: PASS,FLAKY +test-http2-pipe: PASS,FLAKY [$system==aix] diff --git a/test/parallel/test-http2-compat-serverrequest-pipe.js b/test/parallel/test-http2-compat-serverrequest-pipe.js index a19b3191874a69..04c8cfe546f329 100644 --- a/test/parallel/test-http2-compat-serverrequest-pipe.js +++ b/test/parallel/test-http2-compat-serverrequest-pipe.js @@ -11,9 +11,9 @@ const path = require('path'); // piping should work as expected with createWriteStream -const loc = fixtures.path('person.jpg'); -const fn = path.join(common.tmpDir, 'http2pipe.jpg'); common.refreshTmpDir(); +const loc = fixtures.path('url-tests.js'); +const fn = path.join(common.tmpDir, 'http2-url-tests.js'); const server = http2.createServer(); @@ -21,7 +21,7 @@ server.on('request', common.mustCall((req, res) => { const dest = req.pipe(fs.createWriteStream(fn)); dest.on('finish', common.mustCall(() => { assert.strictEqual(req.complete, true); - assert.deepStrictEqual(fs.readFileSync(loc), fs.readFileSync(fn)); + assert.strictEqual(fs.readFileSync(loc).length, fs.readFileSync(fn).length); fs.unlinkSync(fn); res.end(); })); diff --git a/test/parallel/test-http2-pipe.js b/test/parallel/test-http2-pipe.js new file mode 100644 index 00000000000000..819fab5154758d --- /dev/null +++ b/test/parallel/test-http2-pipe.js @@ -0,0 +1,49 @@ +'use strict'; + +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const fixtures = require('../common/fixtures'); +const assert = require('assert'); +const http2 = require('http2'); +const fs = require('fs'); +const path = require('path'); + +// piping should work as expected with createWriteStream + +common.refreshTmpDir(); +const loc = fixtures.path('url-tests.js'); +const fn = path.join(common.tmpDir, 'http2-url-tests.js'); + +const server = http2.createServer(); + +server.on('stream', common.mustCall((stream) => { + const dest = stream.pipe(fs.createWriteStream(fn)); + dest.on('finish', common.mustCall(() => { + assert.strictEqual(fs.readFileSync(loc).length, fs.readFileSync(fn).length); + fs.unlinkSync(fn); + stream.respond(); + stream.end(); + })); +})); + +server.listen(0, common.mustCall(() => { + const port = server.address().port; + const client = http2.connect(`http://localhost:${port}`); + + let remaining = 2; + function maybeClose() { + if (--remaining === 0) { + server.close(); + client.destroy(); + } + } + + const req = client.request({ ':method': 'POST' }); + req.on('response', common.mustCall()); + req.resume(); + req.on('end', common.mustCall(maybeClose)); + const str = fs.createReadStream(loc); + str.on('end', common.mustCall(maybeClose)); + str.pipe(req); +}));