From 220277409e5912f012243ff5cc68eedfad4c22ff Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Wed, 28 Oct 2015 16:36:34 +1100 Subject: [PATCH] http: fix stalled pipeline bug (v3.x backport) Original commit ab03635: This is a two-part fix: - Fix pending data notification in `OutgoingMessage` to notify server about flushed data too - Fix pause/resume behavior for the consumed socket. `resume` event is emitted on a next tick, and `socket._paused` can already be `true` at this time. Pause the socket again to avoid PAUSED error on parser. Fix: https://github.com/nodejs/node/issues/3332 PR-URL: https://github.com/nodejs/node/pull/3342 Reviewed-By: James M Snell Reviewed-By: Trevor Norris --- lib/_http_outgoing.js | 62 +++++++++---------- test/parallel/test-http-pipeline-regr-3332.js | 41 ++++++++++++ 2 files changed, 72 insertions(+), 31 deletions(-) create mode 100644 test/parallel/test-http-pipeline-regr-3332.js diff --git a/lib/_http_outgoing.js b/lib/_http_outgoing.js index c550bcf14635c0..65d77959e4c5a8 100644 --- a/lib/_http_outgoing.js +++ b/lib/_http_outgoing.js @@ -124,7 +124,7 @@ OutgoingMessage.prototype._send = function(data, encoding, callback) { this.outputEncodings.unshift('binary'); this.outputCallbacks.unshift(null); this.outputSize += this._header.length; - if (this._onPendingData !== null) + if (typeof this._onPendingData === 'function') this._onPendingData(this._header.length); } this._headerSent = true; @@ -147,20 +147,7 @@ OutgoingMessage.prototype._writeRaw = function(data, encoding, callback) { // There might be pending data in the this.output buffer. var outputLength = this.output.length; if (outputLength > 0) { - var output = this.output; - var outputEncodings = this.outputEncodings; - var outputCallbacks = this.outputCallbacks; - for (var i = 0; i < outputLength; i++) { - connection.write(output[i], outputEncodings[i], - outputCallbacks[i]); - } - - this.output = []; - this.outputEncodings = []; - this.outputCallbacks = []; - if (this._onPendingData !== null) - this._onPendingData(-this.outputSize); - this.outputSize = 0; + this._flushOutput(connection); } else if (data.length === 0) { if (typeof callback === 'function') process.nextTick(callback); @@ -185,7 +172,7 @@ OutgoingMessage.prototype._buffer = function(data, encoding, callback) { this.outputEncodings.push(encoding); this.outputCallbacks.push(callback); this.outputSize += data.length; - if (this._onPendingData !== null) + if (typeof this._onPendingData === 'function') this._onPendingData(data.length); return false; }; @@ -618,24 +605,11 @@ OutgoingMessage.prototype._finish = function() { // to attempt to flush any pending messages out to the socket. OutgoingMessage.prototype._flush = function() { var socket = this.socket; - var outputLength, ret; + var ret; if (socket && socket.writable) { // There might be remaining data in this.output; write it out - outputLength = this.output.length; - if (outputLength > 0) { - var output = this.output; - var outputEncodings = this.outputEncodings; - var outputCallbacks = this.outputCallbacks; - for (var i = 0; i < outputLength; i++) { - ret = socket.write(output[i], outputEncodings[i], - outputCallbacks[i]); - } - - this.output = []; - this.outputEncodings = []; - this.outputCallbacks = []; - } + ret = this._flushOutput(socket); if (this.finished) { // This is a queue to the server or client to bring in the next this. @@ -647,6 +621,32 @@ OutgoingMessage.prototype._flush = function() { } }; +OutgoingMessage.prototype._flushOutput = function _flushOutput(socket) { + var ret; + var outputLength = this.output.length; + if (outputLength <= 0) + return ret; + + var output = this.output; + var outputEncodings = this.outputEncodings; + var outputCallbacks = this.outputCallbacks; + socket.cork(); + for (var i = 0; i < outputLength; i++) { + ret = socket.write(output[i], outputEncodings[i], + outputCallbacks[i]); + } + socket.uncork(); + + this.output = []; + this.outputEncodings = []; + this.outputCallbacks = []; + if (typeof this._onPendingData === 'function') + this._onPendingData(-this.outputSize); + this.outputSize = 0; + + return ret; +}; + OutgoingMessage.prototype.flushHeaders = function() { if (!this._header) { diff --git a/test/parallel/test-http-pipeline-regr-3332.js b/test/parallel/test-http-pipeline-regr-3332.js new file mode 100644 index 00000000000000..061e202d975f32 --- /dev/null +++ b/test/parallel/test-http-pipeline-regr-3332.js @@ -0,0 +1,41 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const http = require('http'); +const net = require('net'); + +const big = new Buffer(16 * 1024); +big.fill('A'); + +const COUNT = 1e4; + +var received = 0; + +var client; +const server = http.createServer(function(req, res) { + res.end(big, function() { + if (++received === COUNT) { + server.close(); + client.end(); + } + }); +}).listen(common.PORT, function() { + var req = new Array(COUNT + 1).join('GET / HTTP/1.1\r\n\r\n'); + client = net.connect(common.PORT, function() { + client.write(req); + }); + + // Just let the test terminate instead of hanging + client.on('close', function() { + if (received !== COUNT) + server.close(); + }); + client.resume(); +}); + +process.on('exit', function() { + // The server should pause connection on pipeline flood, but it shoul still + // resume it and finish processing the requests, when its output queue will + // be empty again. + assert.equal(received, COUNT); +});