Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async_hooks initial implementation #11883

Closed
wants to merge 12 commits into from
Prev Previous commit
Next Next commit
lib: implement aysnc_hooks API in core
Implement async_hooks support in the following:

* fatalException handler
* process.nextTick
* Timers
* net/dgram/http
  • Loading branch information
trevnorris committed Apr 12, 2017
commit 3783c985e68631f498100bad85a8b7e677483025
9 changes: 7 additions & 2 deletions lib/_http_agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const net = require('net');
const util = require('util');
const EventEmitter = require('events');
const debug = util.debuglog('http');
const async_id_symbol = process.binding('async_wrap').async_id_symbol;
const nextTick = require('internal/process/next_tick').nextTick;

// New Agent code.

Expand Down Expand Up @@ -93,6 +95,7 @@ function Agent(options) {
self.freeSockets[name] = freeSockets;
socket.setKeepAlive(true, self.keepAliveMsecs);
socket.unref();
socket[async_id_symbol] = -1;
socket._httpMessage = null;
self.removeSocket(socket, options);
freeSockets.push(socket);
Expand Down Expand Up @@ -162,6 +165,8 @@ Agent.prototype.addRequest = function addRequest(req, options) {
if (freeLen) {
// we have a free socket, so use that.
var socket = this.freeSockets[name].shift();
// Assign the handle a new asyncId and run any init() hooks.
socket._handle.asyncReset();
debug('have free socket');

// don't leak
Expand All @@ -176,7 +181,7 @@ Agent.prototype.addRequest = function addRequest(req, options) {
// If we are under maxSockets create a new one.
this.createSocket(req, options, function(err, newSocket) {
if (err) {
process.nextTick(function() {
nextTick(newSocket._handle.getAsyncId(), function() {
req.emit('error', err);
});
return;
Expand Down Expand Up @@ -289,7 +294,7 @@ Agent.prototype.removeSocket = function removeSocket(s, options) {
// If we have pending requests and a socket gets closed make a new one
this.createSocket(req, options, function(err, newSocket) {
if (err) {
process.nextTick(function() {
nextTick(newSocket._handle.getAsyncId(), function() {
req.emit('error', err);
});
return;
Expand Down
6 changes: 5 additions & 1 deletion lib/_http_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const Agent = require('_http_agent');
const Buffer = require('buffer').Buffer;
const urlToOptions = require('internal/url').urlToOptions;
const outHeadersKey = require('internal/http').outHeadersKey;
const nextTick = require('internal/process/next_tick').nextTick;

// The actual list of disallowed characters in regexp form is more like:
// /[^A-Za-z0-9\-._~!$&'()*+,;=/:@]/
Expand Down Expand Up @@ -577,9 +578,12 @@ function responseKeepAlive(res, req) {
socket.removeListener('close', socketCloseListener);
socket.removeListener('error', socketErrorListener);
socket.once('error', freeSocketErrorListener);
// There are cases where _handle === null. Avoid those. Passing null to
// nextTick() will call initTriggerId() to retrieve the id.
const asyncId = socket._handle ? socket._handle.getAsyncId() : null;
// Mark this socket as available, AFTER user-added end
// handlers have a chance to run.
process.nextTick(emitFreeNT, socket);
nextTick(asyncId, emitFreeNT, socket);
}
}

Expand Down
8 changes: 7 additions & 1 deletion lib/_http_common.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const HTTPParser = binding.HTTPParser;
const FreeList = require('internal/freelist').FreeList;
const ondrain = require('internal/http').ondrain;
const incoming = require('_http_incoming');
const emitDestroy = require('async_hooks').emitDestroy;
const IncomingMessage = incoming.IncomingMessage;
const readStart = incoming.readStart;
const readStop = incoming.readStop;
Expand Down Expand Up @@ -211,8 +212,13 @@ function freeParser(parser, req, socket) {
parser.incoming = null;
parser.outgoing = null;
parser[kOnExecute] = null;
if (parsers.free(parser) === false)
if (parsers.free(parser) === false) {
parser.close();
} else {
// Since the Parser destructor isn't going to run the destroy() callbacks
// it needs to be triggered manually.
emitDestroy(parser.getAsyncId());
}
}
if (req) {
req.parser = null;
Expand Down
12 changes: 9 additions & 3 deletions lib/_http_outgoing.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const common = require('_http_common');
const checkIsHttpToken = common._checkIsHttpToken;
const checkInvalidHeaderChar = common._checkInvalidHeaderChar;
const outHeadersKey = require('internal/http').outHeadersKey;
const async_id_symbol = process.binding('async_wrap').async_id_symbol;
const nextTick = require('internal/process/next_tick').nextTick;

const CRLF = common.CRLF;
const debug = common.debug;
Expand Down Expand Up @@ -265,8 +267,9 @@ function _writeRaw(data, encoding, callback) {
if (this.output.length) {
this._flushOutput(conn);
} else if (!data.length) {
if (typeof callback === 'function')
process.nextTick(callback);
if (typeof callback === 'function') {
nextTick(this.socket[async_id_symbol], callback);
}
return true;
}
// Directly write to socket.
Expand Down Expand Up @@ -624,7 +627,10 @@ const crlf_buf = Buffer.from('\r\n');
OutgoingMessage.prototype.write = function write(chunk, encoding, callback) {
if (this.finished) {
var err = new Error('write after end');
process.nextTick(writeAfterEndNT.bind(this), err, callback);
nextTick(this.socket[async_id_symbol],
writeAfterEndNT.bind(this),
err,
callback);

return true;
}
Expand Down
6 changes: 3 additions & 3 deletions lib/async_hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var processing_hook = false;
// Use to temporarily store and updated active_hooks_array if the user enables
// or disables a hook while hooks are being processed.
var tmp_active_hooks_array = null;
// Keep track of the field counds held in tmp_active_hooks_array.
// Keep track of the field counts held in tmp_active_hooks_array.
var tmp_async_hook_fields = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo in the comment: counts


// Each constant tracks how many callbacks there are for any given step of
Expand All @@ -41,9 +41,9 @@ var tmp_async_hook_fields = null;
const { kInit, kBefore, kAfter, kDestroy, kCurrentAsyncId, kCurrentTriggerId,
kAsyncUidCntr, kInitTriggerId } = async_wrap.constants;

const { async_id_symbol, trigger_id_symbol } = async_wrap;

// Used in AsyncHook and AsyncEvent.
const async_id_symbol = Symbol('_asyncId');
const trigger_id_symbol = Symbol('_triggerId');
const init_symbol = Symbol('init');
const before_symbol = Symbol('before');
const after_symbol = Symbol('after');
Expand Down
12 changes: 10 additions & 2 deletions lib/dgram.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ const assert = require('assert');
const Buffer = require('buffer').Buffer;
const util = require('util');
const EventEmitter = require('events');
const setInitTriggerId = require('async_hooks').setInitTriggerId;
const UV_UDP_REUSEADDR = process.binding('constants').os.UV_UDP_REUSEADDR;
const async_id_symbol = process.binding('async_wrap').async_id_symbol;
const nextTick = require('internal/process/next_tick').nextTick;

const UDP = process.binding('udp_wrap').UDP;
const SendWrap = process.binding('udp_wrap').SendWrap;
Expand Down Expand Up @@ -111,6 +114,7 @@ function Socket(type, listener) {
this._handle = handle;
this._receiving = false;
this._bindState = BIND_STATE_UNBOUND;
this[async_id_symbol] = this._handle.getAsyncId();
this.type = type;
this.fd = null; // compatibility hack

Expand Down Expand Up @@ -433,6 +437,10 @@ function doSend(ex, self, ip, list, address, port, callback) {
req.callback = callback;
req.oncomplete = afterSend;
}
// node::SendWrap isn't instantiated and attached to the JS instance of
// SendWrap above until send() is called. So don't set the init trigger id
// until now.
setInitTriggerId(self[async_id_symbol]);
var err = self._handle.send(req,
list,
list.length,
Expand All @@ -442,7 +450,7 @@ function doSend(ex, self, ip, list, address, port, callback) {
if (err && callback) {
// don't emit as error, dgram_legacy.js compatibility
const ex = exceptionWithHostPort(err, 'send', address, port);
process.nextTick(callback, ex);
nextTick(self[async_id_symbol], callback, ex);
}
}

Expand All @@ -469,7 +477,7 @@ Socket.prototype.close = function(callback) {
this._stopReceiving();
this._handle.close();
this._handle = null;
process.nextTick(socketCloseNT, this);
nextTick(this[async_id_symbol], socketCloseNT, this);

return this;
};
Expand Down
24 changes: 23 additions & 1 deletion lib/internal/bootstrap_node.js
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,20 @@
}

function setupProcessFatal() {
const async_wrap = process.binding('async_wrap');
// Arrays containing hook flags and ids for async_hook calls.
const { async_hook_fields, async_uid_fields } = async_wrap;
// Internal functions needed to manipulate the stack.
const { clearIdStack, popAsyncIds } = async_wrap;
const { kAfter, kCurrentAsyncId, kInitTriggerId } = async_wrap.constants;

process._fatalException = function(er) {
var caught;

// It's possible that kInitTriggerId was set for a constructor call that
// threw and was never cleared. So clear it now.
async_uid_fields[kInitTriggerId] = 0;

if (process.domain && process.domain._errorHandler)
caught = process.domain._errorHandler(er) || caught;

Expand All @@ -327,9 +337,21 @@
// nothing to be done about it at this point.
}

// if we handled an error, then make sure any ticks get processed
} else {
// If we handled an error, then make sure any ticks get processed
NativeModule.require('timers').setImmediate(process._tickCallback);

// Emit the after() hooks now that the exception has been handled.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • The comment is fine, but could you add a lot of details about why the after hooks needs to be emitted, and why we do this after setImmediate() is called.
  • The setImmediate() is a new resource, so is that going to cause issues in the following cleanup?

if (async_hook_fields[kAfter] > 0) {
do {
NativeModule.require('async_hooks').emitAfter(
async_uid_fields[kCurrentAsyncId]);
// popAsyncIds() returns true if there are more ids on the stack.
} while (popAsyncIds(async_uid_fields[kCurrentAsyncId]));
// Or completely empty the id stack.
} else {
clearIdStack();
}
}

return caught;
Expand Down
Loading