Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async Hooks - Initial Implementation #12892

Closed
wants to merge 11 commits into from
8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ test: all
$(MAKE) build-addons-napi
$(MAKE) cctest
$(PYTHON) tools/test.py --mode=release -J \
doctool inspector known_issues message pseudo-tty parallel sequential $(CI_NATIVE_SUITES)
doctool inspector known_issues message pseudo-tty parallel sequential \
async-hooks $(CI_NATIVE_SUITES)
$(MAKE) lint

test-parallel: all
Expand Down Expand Up @@ -326,7 +327,7 @@ test-all-valgrind: test-build
$(PYTHON) tools/test.py --mode=debug,release --valgrind

CI_NATIVE_SUITES := addons addons-napi
CI_JS_SUITES := doctool inspector known_issues message parallel pseudo-tty sequential
CI_JS_SUITES := doctool inspector known_issues message parallel pseudo-tty sequential async-hooks

# Build and test addons without building anything else
test-ci-native: LOGLEVEL := info
Expand Down Expand Up @@ -418,6 +419,9 @@ test-timers:
test-timers-clean:
$(MAKE) --directory=tools clean

test-async-hooks:
$(PYTHON) tools/test.py --mode=release async-hooks


ifneq ("","$(wildcard deps/v8/tools/run-tests.py)")
test-v8: v8
Expand Down
9 changes: 7 additions & 2 deletions lib/_http_agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const net = require('net');
const util = require('util');
const EventEmitter = require('events');
const debug = util.debuglog('http');
const async_id_symbol = process.binding('async_wrap').async_id_symbol;
const nextTick = require('internal/process/next_tick').nextTick;

// New Agent code.

Expand Down Expand Up @@ -93,6 +95,7 @@ function Agent(options) {
self.freeSockets[name] = freeSockets;
socket.setKeepAlive(true, self.keepAliveMsecs);
socket.unref();
socket[async_id_symbol] = -1;
socket._httpMessage = null;
self.removeSocket(socket, options);
freeSockets.push(socket);
Expand Down Expand Up @@ -163,6 +166,8 @@ Agent.prototype.addRequest = function addRequest(req, options, port/*legacy*/,
if (freeLen) {
// we have a free socket, so use that.
var socket = this.freeSockets[name].shift();
// Assign the handle a new asyncId and run any init() hooks.
socket._handle.asyncReset();
debug('have free socket');

// don't leak
Expand All @@ -177,7 +182,7 @@ Agent.prototype.addRequest = function addRequest(req, options, port/*legacy*/,
// If we are under maxSockets create a new one.
this.createSocket(req, options, function(err, newSocket) {
if (err) {
process.nextTick(function() {
nextTick(newSocket._handle.getAsyncId(), function() {
req.emit('error', err);
});
return;
Expand Down Expand Up @@ -290,7 +295,7 @@ Agent.prototype.removeSocket = function removeSocket(s, options) {
// If we have pending requests and a socket gets closed make a new one
this.createSocket(req, options, function(err, newSocket) {
if (err) {
process.nextTick(function() {
nextTick(newSocket._handle.getAsyncId(), function() {
req.emit('error', err);
});
return;
Expand Down
6 changes: 5 additions & 1 deletion lib/_http_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const Agent = require('_http_agent');
const Buffer = require('buffer').Buffer;
const urlToOptions = require('internal/url').urlToOptions;
const outHeadersKey = require('internal/http').outHeadersKey;
const nextTick = require('internal/process/next_tick').nextTick;

// The actual list of disallowed characters in regexp form is more like:
// /[^A-Za-z0-9\-._~!$&'()*+,;=/:@]/
Expand Down Expand Up @@ -587,9 +588,12 @@ function responseKeepAlive(res, req) {
socket.removeListener('close', socketCloseListener);
socket.removeListener('error', socketErrorListener);
socket.once('error', freeSocketErrorListener);
// There are cases where _handle === null. Avoid those. Passing null to
// nextTick() will call initTriggerId() to retrieve the id.
const asyncId = socket._handle ? socket._handle.getAsyncId() : null;
// Mark this socket as available, AFTER user-added end
// handlers have a chance to run.
process.nextTick(emitFreeNT, socket);
nextTick(asyncId, emitFreeNT, socket);
}
}

Expand Down
8 changes: 7 additions & 1 deletion lib/_http_common.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const HTTPParser = binding.HTTPParser;
const FreeList = require('internal/freelist');
const ondrain = require('internal/http').ondrain;
const incoming = require('_http_incoming');
const emitDestroy = require('async_hooks').emitDestroy;
const IncomingMessage = incoming.IncomingMessage;
const readStart = incoming.readStart;
const readStop = incoming.readStop;
Expand Down Expand Up @@ -211,8 +212,13 @@ function freeParser(parser, req, socket) {
parser.incoming = null;
parser.outgoing = null;
parser[kOnExecute] = null;
if (parsers.free(parser) === false)
if (parsers.free(parser) === false) {
parser.close();
} else {
// Since the Parser destructor isn't going to run the destroy() callbacks
// it needs to be triggered manually.
emitDestroy(parser.getAsyncId());
}
}
if (req) {
req.parser = null;
Expand Down
12 changes: 9 additions & 3 deletions lib/_http_outgoing.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const common = require('_http_common');
const checkIsHttpToken = common._checkIsHttpToken;
const checkInvalidHeaderChar = common._checkInvalidHeaderChar;
const outHeadersKey = require('internal/http').outHeadersKey;
const async_id_symbol = process.binding('async_wrap').async_id_symbol;
const nextTick = require('internal/process/next_tick').nextTick;

const CRLF = common.CRLF;
const debug = common.debug;
Expand Down Expand Up @@ -264,8 +266,9 @@ function _writeRaw(data, encoding, callback) {
if (this.output.length) {
this._flushOutput(conn);
} else if (!data.length) {
if (typeof callback === 'function')
process.nextTick(callback);
if (typeof callback === 'function') {
nextTick(this.socket[async_id_symbol], callback);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably use conn instead of this.socket to be extra safe.

}
return true;
}
// Directly write to socket.
Expand Down Expand Up @@ -623,7 +626,10 @@ const crlf_buf = Buffer.from('\r\n');
OutgoingMessage.prototype.write = function write(chunk, encoding, callback) {
if (this.finished) {
var err = new Error('write after end');
process.nextTick(writeAfterEndNT.bind(this), err, callback);
nextTick(this.socket[async_id_symbol],
writeAfterEndNT.bind(this),
err,
callback);

return true;
}
Expand Down
Loading