Skip to content

Commit

Permalink
http: reuse socket only when it is drained
Browse files Browse the repository at this point in the history
Ensuring every request is assigned to a drained socket or nothing.
Because is has no benifit for a request to be attached to a non
drained socket and it prevents the request from being assigned to
a drained one, which might occur soon or already in the free pool
We achieve this by claiming a socket as free only when the socket
is drained.
  • Loading branch information
ywave620 committed Jul 20, 2022
1 parent 3473d1b commit 64b903e
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 11 deletions.
13 changes: 9 additions & 4 deletions lib/_http_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,6 @@ ObjectSetPrototypeOf(ClientRequest.prototype, OutgoingMessage.prototype);
ObjectSetPrototypeOf(ClientRequest, OutgoingMessage);

ClientRequest.prototype._finish = function _finish() {
FunctionPrototypeCall(OutgoingMessage.prototype._finish, this);
if (hasObserver('http')) {
startPerf(this, kClientRequestStatistics, {
type: 'http',
Expand Down Expand Up @@ -658,7 +657,7 @@ function parserOnIncomingClient(res, shouldKeepAlive) {

// Add our listener first, so that we guarantee socket cleanup
res.on('end', responseOnEnd);
req.on('prefinish', requestOnPrefinish);
req.on('finish', requestOnFinish);
socket.on('timeout', responseOnTimeout);

// If the user did not listen for the 'response' event, then they
Expand Down Expand Up @@ -730,12 +729,16 @@ function responseOnEnd() {
socket.end();
}
assert(!socket.writable);
} else if (req.finished && !this.aborted) {
} else if (req.writableFinished && !this.aborted) {
assert(req.finished);
// We can assume `req.finished` means all data has been written since:
// - `'responseOnEnd'` means we have been assigned a socket.
// - when we have a socket we write directly to it without buffering.
// - `req.finished` means `end()` has been called and no further data.
// can be written
// In addition, `req.writableFinished` means all data written has been
// accepted by the kernel. (i.e. the `req.socket` is drained).Without
// this constraint, we may assign a non drained socket to a request.
responseKeepAlive(req);
}
}
Expand All @@ -748,7 +751,9 @@ function responseOnTimeout() {
res.emit('timeout');
}

function requestOnPrefinish() {
// This function is necessary in the case where we receive the entire reponse
// from server before we finish sending out the request
function requestOnFinish() {
const req = this;

if (req.shouldKeepAlive && req._ended)
Expand Down
12 changes: 6 additions & 6 deletions lib/_http_outgoing.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ const {
} = primordials;

const { getDefaultHighWaterMark } = require('internal/streams/state');
const assert = require('internal/assert');
const EE = require('events');
const Stream = require('stream');
const internalUtil = require('internal/util');
Expand Down Expand Up @@ -985,10 +984,11 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) {
};


OutgoingMessage.prototype._finish = function _finish() {
assert(this.socket);
this.emit('prefinish');
};
// Subclasses HttpClientRequest and HttpServerResponse should
// implement this function. This function is called once all user data
// are flushed to the socket. Note that it has a chance that the socket
// is not drained.
OutgoingMessage.prototype._finish = function() {};


// This logic is probably a bit confusing. Let me explain a bit:
Expand All @@ -1008,7 +1008,7 @@ OutgoingMessage.prototype._finish = function _finish() {
// the socket yet. Thus the outgoing messages need to be prepared to queue
// up data internally before sending it on further to the socket's queue.
//
// This function, outgoingFlush(), is called by both the Server and Client
// This function, _flush(), is called by both the Server and Client
// to attempt to flush any pending messages out to the socket.
OutgoingMessage.prototype._flush = function _flush() {
const socket = this.socket;
Expand Down
1 change: 0 additions & 1 deletion lib/_http_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ ServerResponse.prototype._finish = function _finish() {
},
});
}
OutgoingMessage.prototype._finish.call(this);
};


Expand Down
79 changes: 79 additions & 0 deletions test/parallel/test-http-agent-reuse-drained-socket-only.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const http = require('http');
const net = require('net');

const agent = new http.Agent({
keepAlive: true,
maxFreeSockets: 1024
});

const server = net.createServer({
pauseOnConnect: true,
}, (sock) => {
// Do not read anything from `sock`
sock.pause();
sock.write('HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: Keep-Alive\r\n\r\n');
});

server.listen(0, common.mustCall(() => {
sendFstReq(server.address().port);
}));

function sendFstReq(serverPort) {
const req = http.request({
agent,
host: '127.0.0.1',
port: serverPort,
}, (res) => {
res.on('data', noop);
res.on('end', common.mustCall(() => {
// Agent's socket reusing code is registered to process.nextTick(),
// to ensure it take effect, fire in the next event loop
setTimeout(sendSecReq, 10, serverPort, req.socket.localPort);
}));
});

// Overwhelm the flow control window
// note that tcp over the loopback inteface has a large flow control window
assert.strictEqual(req.write('a'.repeat(6_000_000)), false);

req.end();
}

function sendSecReq(serverPort, fstReqCliPort) {
// Make the second request, which should be sent on a new socket
// because the first socket is not drained and hence can not be reused
const req = http.request({
agent,
host: '127.0.0.1',
port: serverPort,
}, (res) => {
res.on('data', noop);
res.on('end', common.mustCall(() => {
setTimeout(sendThrReq, 10, serverPort, req.socket.localPort);
}));
});

req.on('socket', common.mustCall((sock) => {
assert.notStrictEqual(sock.localPort, fstReqCliPort);
}));
req.end();
}

function sendThrReq(serverPort, secReqCliPort) {
// Make the third request, the agent should reuse the second socket we just made
const req = http.request({
agent,
host: '127.0.0.1',
port: serverPort,
}, noop);

req.on('socket', common.mustCall((sock) => {
assert.strictEqual(sock.localPort, secReqCliPort);
process.exit(0);
}));
}

function noop() { }

0 comments on commit 64b903e

Please sign in to comment.