From 7cce7fa18f98219c23f0a207b5c7aadfce12d2ea Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Sat, 3 Nov 2018 19:00:41 +0100 Subject: [PATCH] net,http2: merge after-write code PR-URL: https://github.com/nodejs/node/pull/24380 Refs: https://github.com/nodejs/node/issues/19060 Reviewed-By: James M Snell Reviewed-By: Daniel Bevenius --- lib/internal/http2/core.js | 25 +++++++----------- lib/internal/stream_base_commons.js | 28 ++++++++++++++++++-- lib/net.js | 40 +++++------------------------ src/node_http2.cc | 8 ++++-- 4 files changed, 47 insertions(+), 54 deletions(-) diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 9c17f8077b3331..ecf243f845b6d5 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -108,6 +108,7 @@ const { writeGeneric, writevGeneric, onStreamRead, + kAfterAsyncWrite, kMaybeDestroy, kUpdateTimer } = require('internal/stream_base_commons'); @@ -1515,21 +1516,6 @@ function trackWriteState(stream, bytes) { session[kHandle].chunksSentSinceLastWrite = 0; } -function afterDoStreamWrite(status, handle) { - const stream = handle[kOwner]; - const session = stream[kSession]; - - stream[kUpdateTimer](); - - const { bytes } = this; - stream[kState].writeQueueSize -= bytes; - - if (session !== undefined) - session[kState].writeQueueSize -= bytes; - if (typeof this.callback === 'function') - this.callback(null); -} - function streamOnResume() { if (!this.destroyed) this[kHandle].readStart(); @@ -1782,6 +1768,13 @@ class Http2Stream extends Duplex { 'bug in Node.js'); } + [kAfterAsyncWrite]({ bytes }) { + this[kState].writeQueueSize -= bytes; + + if (this.session !== undefined) + this.session[kState].writeQueueSize -= bytes; + } + [kWriteGeneric](writev, data, encoding, cb) { // When the Http2Stream is first created, it is corked until the // handle and the stream ID is assigned. However, if the user calls @@ -1808,7 +1801,7 @@ class Http2Stream extends Duplex { if (!this.headersSent) this[kProceed](); - const req = createWriteWrap(this[kHandle], afterDoStreamWrite); + const req = createWriteWrap(this[kHandle]); req.stream = this[kID]; if (writev) diff --git a/lib/internal/stream_base_commons.js b/lib/internal/stream_base_commons.js index 709395fa910cb2..31291e751d57a6 100644 --- a/lib/internal/stream_base_commons.js +++ b/lib/internal/stream_base_commons.js @@ -16,6 +16,7 @@ const { owner_symbol } = require('internal/async_hooks').symbols; const kMaybeDestroy = Symbol('kMaybeDestroy'); const kUpdateTimer = Symbol('kUpdateTimer'); +const kAfterAsyncWrite = Symbol('kAfterAsyncWrite'); function handleWriteReq(req, data, encoding) { const { handle } = req; @@ -52,11 +53,33 @@ function handleWriteReq(req, data, encoding) { } } -function createWriteWrap(handle, oncomplete) { +function onWriteComplete(status) { + const stream = this.handle[owner_symbol]; + + if (stream.destroyed) { + if (typeof this.callback === 'function') + this.callback(null); + return; + } + + if (status < 0) { + const ex = errnoException(status, 'write', this.error); + stream.destroy(ex, this.callback); + return; + } + + stream[kUpdateTimer](); + stream[kAfterAsyncWrite](this); + + if (typeof this.callback === 'function') + this.callback(null); +} + +function createWriteWrap(handle) { var req = new WriteWrap(); req.handle = handle; - req.oncomplete = oncomplete; + req.oncomplete = onWriteComplete; req.async = false; req.bytes = 0; req.buffer = null; @@ -160,6 +183,7 @@ module.exports = { writevGeneric, writeGeneric, onStreamRead, + kAfterAsyncWrite, kMaybeDestroy, kUpdateTimer, }; diff --git a/lib/net.js b/lib/net.js index 08d08888dedc34..01cfed98e872a4 100644 --- a/lib/net.js +++ b/lib/net.js @@ -62,6 +62,7 @@ const { writevGeneric, writeGeneric, onStreamRead, + kAfterAsyncWrite, kUpdateTimer } = require('internal/stream_base_commons'); const { @@ -685,6 +686,10 @@ protoGetter('localPort', function localPort() { }); +Socket.prototype[kAfterAsyncWrite] = function() { + this[kLastWriteQueueSize] = 0; +}; + Socket.prototype._writeGeneric = function(writev, data, encoding, cb) { // If we are still connecting, then buffer this for later. // The Writable logic will buffer up any more writes while @@ -707,7 +712,7 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) { this._unrefTimer(); - var req = createWriteWrap(this._handle, afterWrite); + var req = createWriteWrap(this._handle); if (writev) writevGeneric(this, req, data, cb); else @@ -771,39 +776,6 @@ protoGetter('bytesWritten', function bytesWritten() { }); -function afterWrite(status, handle, err) { - var self = handle[owner_symbol]; - if (self !== process.stderr && self !== process.stdout) - debug('afterWrite', status); - - if (this.async) - self[kLastWriteQueueSize] = 0; - - // callback may come after call to destroy. - if (self.destroyed) { - debug('afterWrite destroyed'); - if (this.callback) - this.callback(null); - return; - } - - if (status < 0) { - var ex = errnoException(status, 'write', this.error); - debug('write failure', ex); - self.destroy(ex, this.callback); - return; - } - - self._unrefTimer(); - - if (self !== process.stderr && self !== process.stdout) - debug('afterWrite call cb'); - - if (this.callback) - this.callback.call(undefined); -} - - function checkBindError(err, port, handle) { // EADDRINUSE may not be reported until we call listen() or connect(). // To complicate matters, a failed bind() followed by listen() or connect() diff --git a/src/node_http2.cc b/src/node_http2.cc index f92d39f655b7e2..20cfcf3745c81b 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -1571,8 +1571,12 @@ void Http2Session::ClearOutgoing(int status) { current_outgoing_buffers_.swap(outgoing_buffers_); for (const nghttp2_stream_write& wr : current_outgoing_buffers_) { WriteWrap* wrap = wr.req_wrap; - if (wrap != nullptr) - wrap->Done(status); + if (wrap != nullptr) { + // TODO(addaleax): Pass `status` instead of 0, so that we actually error + // out with the error from the write to the underlying protocol, + // if one occurred. + wrap->Done(0); + } } }