Skip to content

Commit

Permalink
http: reuse socket only when it is drained
Browse files Browse the repository at this point in the history
Ensuring every request is assigned to a drained socket or nothing.
Because is has no benifit for a request to be attached to a non
drained socket and it prevents the request from being assigned to
a drained one, which might occur soon or already in the free pool
We achieve this by claiming a socket as free only when the socket
is drained.

PR-URL: #43902
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Paolo Insogna <paolo@cowtech.it>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
  • Loading branch information
ywave620 authored Jul 27, 2022
1 parent 26e2742 commit 93e0bf9
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 4 deletions.
12 changes: 9 additions & 3 deletions lib/_http_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ function parserOnIncomingClient(res, shouldKeepAlive) {

// Add our listener first, so that we guarantee socket cleanup
res.on('end', responseOnEnd);
req.on('prefinish', requestOnPrefinish);
req.on('finish', requestOnFinish);
socket.on('timeout', responseOnTimeout);

// If the user did not listen for the 'response' event, then they
Expand Down Expand Up @@ -730,12 +730,16 @@ function responseOnEnd() {
socket.end();
}
assert(!socket.writable);
} else if (req.finished && !this.aborted) {
} else if (req.writableFinished && !this.aborted) {
assert(req.finished);
// We can assume `req.finished` means all data has been written since:
// - `'responseOnEnd'` means we have been assigned a socket.
// - when we have a socket we write directly to it without buffering.
// - `req.finished` means `end()` has been called and no further data.
// can be written
// In addition, `req.writableFinished` means all data written has been
// accepted by the kernel. (i.e. the `req.socket` is drained).Without
// this constraint, we may assign a non drained socket to a request.
responseKeepAlive(req);
}
}
Expand All @@ -748,7 +752,9 @@ function responseOnTimeout() {
res.emit('timeout');
}

function requestOnPrefinish() {
// This function is necessary in the case where we receive the entire reponse
// from server before we finish sending out the request
function requestOnFinish() {
const req = this;

if (req.shouldKeepAlive && req._ended)
Expand Down
4 changes: 3 additions & 1 deletion lib/_http_outgoing.js
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,8 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) {
};


// This function is called once all user data are flushed to the socket.
// Note that it has a chance that the socket is not drained.
OutgoingMessage.prototype._finish = function _finish() {
assert(this.socket);
this.emit('prefinish');
Expand All @@ -1008,7 +1010,7 @@ OutgoingMessage.prototype._finish = function _finish() {
// the socket yet. Thus the outgoing messages need to be prepared to queue
// up data internally before sending it on further to the socket's queue.
//
// This function, outgoingFlush(), is called by both the Server and Client
// This function, _flush(), is called by both the Server and Client
// to attempt to flush any pending messages out to the socket.
OutgoingMessage.prototype._flush = function _flush() {
const socket = this.socket;
Expand Down
122 changes: 122 additions & 0 deletions test/parallel/test-http-agent-reuse-drained-socket-only.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const http = require('http');
const net = require('net');

const agent = new http.Agent({
keepAlive: true,
maxFreeSockets: Infinity,
maxSockets: Infinity,
maxTotalSockets: Infinity,
});

const server = net.createServer({
pauseOnConnect: true,
}, (sock) => {
// Do not read anything from `sock`
sock.pause();
sock.write('HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: Keep-Alive\r\n\r\n');
});

server.listen(0, common.mustCall(() => {
sendFstReq(server.address().port);
}));

function sendFstReq(serverPort) {
const req = http.request({
agent,
host: '127.0.0.1',
port: serverPort,
}, (res) => {
res.on('data', noop);
res.on('end', common.mustCall(() => {
// Agent's socket reusing code is registered to process.nextTick(),
// and will be run after this function, make sure it take effect.
setImmediate(sendSecReq, serverPort, req.socket.localPort);
}));
});

// Make the `req.socket` non drained, i.e. has some data queued to write to
// and accept by the kernel. In Linux and Mac, we only need to call `req.end(aLargeBuffer)`.
// However, in Windows, the mechanism of acceptance is loose, the following code is a workaround
// for Windows.

/**
* https://docs.microsoft.com/en-US/troubleshoot/windows/win32/data-segment-tcp-winsock says
*
* Winsock uses the following rules to indicate a send completion to the application
* (depending on how the send is invoked, the completion notification could be the
* function returning from a blocking call, signaling an event, or calling a notification
* function, and so forth):
* - If the socket is still within SO_SNDBUF quota, Winsock copies the data from the application
* send and indicates the send completion to the application.
* - If the socket is beyond SO_SNDBUF quota and there's only one previously buffered send still
* in the stack kernel buffer, Winsock copies the data from the application send and indicates
* the send completion to the application.
* - If the socket is beyond SO_SNDBUF quota and there's more than one previously buffered send
* in the stack kernel buffer, Winsock copies the data from the application send. Winsock doesn't
* indicate the send completion to the application until the stack completes enough sends to put
* back the socket within SO_SNDBUF quota or only one outstanding send condition.
*/

req.on('socket', () => {
req.socket.on('connect', () => {
// Print tcp send buffer information
console.log(process.report.getReport().libuv.filter((handle) => handle.type === 'tcp'));

const dataLargerThanTCPSendBuf = Buffer.alloc(1024 * 1024 * 64, 0);

req.write(dataLargerThanTCPSendBuf);
req.uncork();
if (process.platform === 'win32') {
assert.ok(req.socket.writableLength === 0);
}

req.write(dataLargerThanTCPSendBuf);
req.uncork();
if (process.platform === 'win32') {
assert.ok(req.socket.writableLength === 0);
}

req.end(dataLargerThanTCPSendBuf);
assert.ok(req.socket.writableLength > 0);
});
});
}

function sendSecReq(serverPort, fstReqCliPort) {
// Make the second request, which should be sent on a new socket
// because the first socket is not drained and hence can not be reused
const req = http.request({
agent,
host: '127.0.0.1',
port: serverPort,
}, (res) => {
res.on('data', noop);
res.on('end', common.mustCall(() => {
setImmediate(sendThrReq, serverPort, req.socket.localPort);
}));
});

req.on('socket', common.mustCall((sock) => {
assert.notStrictEqual(sock.localPort, fstReqCliPort);
}));
req.end();
}

function sendThrReq(serverPort, secReqCliPort) {
// Make the third request, the agent should reuse the second socket we just made
const req = http.request({
agent,
host: '127.0.0.1',
port: serverPort,
}, noop);

req.on('socket', common.mustCall((sock) => {
assert.strictEqual(sock.localPort, secReqCliPort);
process.exit(0);
}));
}

function noop() { }

0 comments on commit 93e0bf9

Please sign in to comment.