Skip to content

Commit

Permalink
child_process: create proper public API for channel
Browse files Browse the repository at this point in the history
Instead of exposing the C++ bindings object as `subprocess.channel`
or `process.channel`, provide the “control” object that was
previously used internally as the public-facing variant of it.

This should be better than returning the raw pipe object, and
matches the original intention (when the `channel` property was
first added) of providing a proper way to `.ref()` or `.unref()`
the channel.

PR-URL: #30165
Refs: #9322
Refs: #9313
Reviewed-By: Jeremiah Senkpiel <fishrock123@rocketmail.com>
Reviewed-By: Denys Otrishko <shishugi@gmail.com>
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
addaleax authored and BridgeAR committed Jan 3, 2020
1 parent d6a2bad commit e65bed1
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 23 deletions.
20 changes: 20 additions & 0 deletions doc/api/child_process.md
Original file line number Diff line number Diff line change
Expand Up @@ -1018,13 +1018,33 @@ See [Advanced Serialization][] for more details.
### `subprocess.channel`
<!-- YAML
added: v7.1.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/30165
description: The object no longer accidentally exposes native C++ bindings.
-->

* {Object} A pipe representing the IPC channel to the child process.

The `subprocess.channel` property is a reference to the child's IPC channel. If
no IPC channel currently exists, this property is `undefined`.

#### `subprocess.channel.ref()`
<!-- YAML
added: v7.1.0
-->

This method makes the IPC channel keep the event loop of the parent process
running if `.unref()` has been called before.

#### `subprocess.channel.unref()`
<!-- YAML
added: v7.1.0
-->

This method makes the IPC channel not keep the event loop of the parent process
running, and lets it finish even while the channel is open.

### `subprocess.connected`
<!-- YAML
added: v0.7.2
Expand Down
28 changes: 28 additions & 0 deletions doc/api/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,10 @@ $ bash -c 'exec -a customArgv0 ./node'
## `process.channel`
<!-- YAML
added: v7.1.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/30165
description: The object no longer accidentally exposes native C++ bindings.
-->

* {Object}
Expand All @@ -635,6 +639,30 @@ If the Node.js process was spawned with an IPC channel (see the
property is a reference to the IPC channel. If no IPC channel exists, this
property is `undefined`.

### `process.channel.ref()`
<!-- YAML
added: v7.1.0
-->

This method makes the IPC channel keep the event loop of the process
running if `.unref()` has been called before.

Typically, this is managed through the number of `'disconnect'` and `'message'`
listeners on the `process` object. However, this method can be used to
explicitly request a specific behavior.

### `process.channel.unref()`
<!-- YAML
added: v7.1.0
-->

This method makes the IPC channel not keep the event loop of the process
running, and lets it finish even while the channel is open.

Typically, this is managed through the number of `'disconnect'` and `'message'`
listeners on the `process` object. However, this method can be used to
explicitly request a specific behavior.

## `process.chdir(directory)`
<!-- YAML
added: v0.1.17
Expand Down
4 changes: 2 additions & 2 deletions lib/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@ function _forkChild(fd, serializationMode) {
p.unref();
const control = setupChannel(process, p, serializationMode);
process.on('newListener', function onNewListener(name) {
if (name === 'message' || name === 'disconnect') control.ref();
if (name === 'message' || name === 'disconnect') control.refCounted();
});
process.on('removeListener', function onRemoveListener(name) {
if (name === 'message' || name === 'disconnect') control.unref();
if (name === 'message' || name === 'disconnect') control.unrefCounted();
});
}

Expand Down
3 changes: 2 additions & 1 deletion lib/internal/bootstrap/switches/is_main_thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ function createWritableStdioStream(fd) {
// an error when trying to use it again. In that case, create the socket
// using the existing handle instead of the fd.
if (process.channel && process.channel.fd === fd) {
const { kChannelHandle } = require('internal/child_process');
stream = new net.Socket({
handle: process.channel,
handle: process[kChannelHandle],
readable: false,
writable: true
});
Expand Down
56 changes: 41 additions & 15 deletions lib/internal/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ let freeParser;
let HTTPParser;

const MAX_HANDLE_RETRANSMISSIONS = 3;
const kChannelHandle = Symbol('kChannelHandle');
const kIsUsedAsStdio = Symbol('kIsUsedAsStdio');

// This object contain function to convert TCP objects to native handle objects
Expand Down Expand Up @@ -108,7 +109,7 @@ const handleConversion = {
// The worker should keep track of the socket
message.key = socket.server._connectionKey;

const firstTime = !this.channel.sockets.send[message.key];
const firstTime = !this[kChannelHandle].sockets.send[message.key];
const socketList = getSocketList('send', this, message.key);

// The server should no longer expose a .connection property
Expand Down Expand Up @@ -508,30 +509,55 @@ ChildProcess.prototype.unref = function() {
};

class Control extends EventEmitter {
#channel = null;
#refs = 0;
#refExplicitlySet = false;

constructor(channel) {
super();
this.channel = channel;
this.refs = 0;
this.#channel = channel;
}
ref() {
if (++this.refs === 1) {
this.channel.ref();

// The methods keeping track of the counter are being used to track the
// listener count on the child process object as well as when writes are
// in progress. Once the user has explicitly requested a certain state, these
// methods become no-ops in order to not interfere with the user's intentions.
refCounted() {
if (++this.#refs === 1 && !this.#refExplicitlySet) {
this.#channel.ref();
}
}
unref() {
if (--this.refs === 0) {
this.channel.unref();

unrefCounted() {
if (--this.#refs === 0 && !this.#refExplicitlySet) {
this.#channel.unref();
this.emit('unref');
}
}

ref() {
this.#refExplicitlySet = true;
this.#channel.ref();
}

unref() {
this.#refExplicitlySet = true;
this.#channel.unref();
}

get fd() {
return this.#channel ? this.#channel.fd : undefined;
}
}

const channelDeprecationMsg = '_channel is deprecated. ' +
'Use ChildProcess.channel instead.';

let serialization;
function setupChannel(target, channel, serializationMode) {
target.channel = channel;
const control = new Control(channel);
target.channel = control;
target[kChannelHandle] = channel;

ObjectDefineProperty(target, '_channel', {
get: deprecate(() => {
Expand All @@ -547,8 +573,6 @@ function setupChannel(target, channel, serializationMode) {
target._handleQueue = null;
target._pendingMessage = null;

const control = new Control(channel);

if (serialization === undefined)
serialization = require('internal/child_process/serialization');
const {
Expand Down Expand Up @@ -796,11 +820,11 @@ function setupChannel(target, channel, serializationMode) {

if (wasAsyncWrite) {
req.oncomplete = () => {
control.unref();
control.unrefCounted();
if (typeof callback === 'function')
callback(null);
};
control.ref();
control.refCounted();
} else if (typeof callback === 'function') {
process.nextTick(callback, null);
}
Expand Down Expand Up @@ -855,6 +879,7 @@ function setupChannel(target, channel, serializationMode) {

// This marks the fact that the channel is actually disconnected.
this.channel = null;
this[kChannelHandle] = null;

if (this._pendingMessage)
closePendingHandle(this);
Expand Down Expand Up @@ -1011,7 +1036,7 @@ function getValidStdio(stdio, sync) {


function getSocketList(type, worker, key) {
const sockets = worker.channel.sockets[type];
const sockets = worker[kChannelHandle].sockets[type];
let socketList = sockets[key];
if (!socketList) {
const Construct = type === 'send' ? SocketListSend : SocketListReceive;
Expand Down Expand Up @@ -1054,6 +1079,7 @@ function spawnSync(options) {

module.exports = {
ChildProcess,
kChannelHandle,
setupChannel,
getValidStdio,
stdioStringToArray,
Expand Down
9 changes: 6 additions & 3 deletions test/parallel/test-child-process-recv-handle.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ else
function master() {
// spawn() can only create one IPC channel so we use stdin/stdout as an
// ad-hoc command channel.
const proc = spawn(process.execPath, [__filename, 'worker'], {
const proc = spawn(process.execPath, [
'--expose-internals', __filename, 'worker'
], {
stdio: ['pipe', 'pipe', 'pipe', 'ipc']
});
let handle = null;
Expand All @@ -57,12 +59,13 @@ function master() {
}

function worker() {
process.channel.readStop(); // Make messages batch up.
const { kChannelHandle } = require('internal/child_process');
process[kChannelHandle].readStop(); // Make messages batch up.
process.stdout.ref();
process.stdout.write('ok\r\n');
process.stdin.once('data', common.mustCall((data) => {
assert.strictEqual(data.toString(), 'ok\r\n');
process.channel.readStart();
process[kChannelHandle].readStart();
}));
let n = 0;
process.on('message', common.mustCall((msg, handle) => {
Expand Down
3 changes: 1 addition & 2 deletions test/parallel/test-child-process-silent.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ if (process.argv[2] === 'pipe') {
const child = childProcess.fork(process.argv[1], ['pipe'], { silent: true });

// Allow child process to self terminate
child.channel.close();
child.channel = null;
child.disconnect();

child.on('exit', function() {
process.exit(0);
Expand Down

0 comments on commit e65bed1

Please sign in to comment.