Skip to content

Commit

Permalink
worker: implement MessagePort and MessageChannel
Browse files Browse the repository at this point in the history
Implement `MessagePort` and `MessageChannel` along the lines of
the DOM classes of the same names. `MessagePort`s initially
support transferring only `ArrayBuffer`s.

Thanks to Stephen Belanger for reviewing this change in its
original form, to Benjamin Gruenbaum for reviewing the
added tests in their original form, and to Olivia Hugger
for reviewing the documentation in its original form.

Refs: ayojs/ayo#98

PR-URL: #20876
Reviewed-By: Gireesh Punathil <gpunathi@in.ibm.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Shingo Inoue <leko.noor@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Tiancheng "Timothy" Gu <timothygu99@gmail.com>
Reviewed-By: John-David Dalton <john.david.dalton@gmail.com>
Reviewed-By: Gus Caplan <me@gus.host>
  • Loading branch information
addaleax committed Jun 6, 2018
1 parent 2e886e9 commit e7a2367
Show file tree
Hide file tree
Showing 23 changed files with 1,140 additions and 2 deletions.
1 change: 1 addition & 0 deletions doc/api/_toc.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
* [Utilities](util.html)
* [V8](v8.html)
* [VM](vm.html)
* [Worker](worker.html)
* [ZLIB](zlib.html)

<div class="line"></div>
Expand Down
1 change: 1 addition & 0 deletions doc/api/all.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@
@include util
@include v8
@include vm
@include worker
@include zlib
16 changes: 16 additions & 0 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -650,12 +650,23 @@ Used when a child process is being forked without specifying an IPC channel.
Used when the main process is trying to read data from the child process's
STDERR / STDOUT, and the data's length is longer than the `maxBuffer` option.

<a id="ERR_CLOSED_MESSAGE_PORT"></a>
### ERR_CLOSED_MESSAGE_PORT

There was an attempt to use a `MessagePort` instance in a closed
state, usually after `.close()` has been called.

<a id="ERR_CONSOLE_WRITABLE_STREAM"></a>
### ERR_CONSOLE_WRITABLE_STREAM

`Console` was instantiated without `stdout` stream, or `Console` has a
non-writable `stdout` or `stderr` stream.

<a id="ERR_CONSTRUCT_CALL_REQUIRED"></a>
### ERR_CONSTRUCT_CALL_REQUIRED

A constructor for a class was called without `new`.

<a id="ERR_CPU_USAGE"></a>
### ERR_CPU_USAGE

Expand Down Expand Up @@ -1203,6 +1214,11 @@ urlSearchParams.has.call(buf, 'foo');
// Throws a TypeError with code 'ERR_INVALID_THIS'
```

<a id="ERR_INVALID_TRANSFER_OBJECT"></a>
### ERR_INVALID_TRANSFER_OBJECT

An invalid transfer object was passed to `postMessage()`.

<a id="ERR_INVALID_TUPLE"></a>
### ERR_INVALID_TUPLE

Expand Down
146 changes: 146 additions & 0 deletions doc/api/worker.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# Worker

<!--introduced_in=REPLACEME-->

> Stability: 1 - Experimental
## Class: MessageChannel
<!-- YAML
added: REPLACEME
-->

Instances of the `worker.MessageChannel` class represent an asynchronous,
two-way communications channel.
The `MessageChannel` has no methods of its own. `new MessageChannel()`
yields an object with `port1` and `port2` properties, which refer to linked
[`MessagePort`][] instances.

```js
const { MessageChannel } = require('worker');

const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => console.log('received', message));
port2.postMessage({ foo: 'bar' });
// prints: received { foo: 'bar' }
```

## Class: MessagePort
<!-- YAML
added: REPLACEME
-->

* Extends: {EventEmitter}

Instances of the `worker.MessagePort` class represent one end of an
asynchronous, two-way communications channel. It can be used to transfer
structured data, memory regions and other `MessagePort`s between different
[`Worker`][]s.

With the exception of `MessagePort`s being [`EventEmitter`][]s rather
than `EventTarget`s, this implementation matches [browser `MessagePort`][]s.

### Event: 'close'
<!-- YAML
added: REPLACEME
-->

The `'close'` event is emitted once either side of the channel has been
disconnected.

### Event: 'message'
<!-- YAML
added: REPLACEME
-->

* `value` {any} The transmitted value

The `'message'` event is emitted for any incoming message, containing the cloned
input of [`port.postMessage()`][].

Listeners on this event will receive a clone of the `value` parameter as passed
to `postMessage()` and no further arguments.

### port.close()
<!-- YAML
added: REPLACEME
-->

Disables further sending of messages on either side of the connection.
This method can be called once you know that no further communication
will happen over this `MessagePort`.

### port.postMessage(value[, transferList])
<!-- YAML
added: REPLACEME
-->

* `value` {any}
* `transferList` {Object[]}

Sends a JavaScript value to the receiving side of this channel.
`value` will be transferred in a way which is compatible with
the [HTML structured clone algorithm][]. In particular, it may contain circular
references and objects like typed arrays that the `JSON` API is not able
to stringify.

`transferList` may be a list of `ArrayBuffer` objects.
After transferring, they will not be usable on the sending side of the channel
anymore (even if they are not contained in `value`).

`value` may still contain `ArrayBuffer` instances that are not in
`transferList`; in that case, the underlying memory is copied rather than moved.

For more information on the serialization and deserialization mechanisms
behind this API, see the [serialization API of the `v8` module][v8.serdes].

Because the object cloning uses the structured clone algorithm,
non-enumerable properties, property accessors, and object prototypes are
not preserved. In particular, [`Buffer`][] objects will be read as
plain [`Uint8Array`][]s on the receiving side.

The message object will be cloned immediately, and can be modified after
posting without having side effects.

### port.ref()
<!-- YAML
added: REPLACEME
-->

Opposite of `unref()`. Calling `ref()` on a previously `unref()`ed port will
*not* let the program exit if it's the only active handle left (the default
behavior). If the port is `ref()`ed, calling `ref()` again will have no effect.

If listeners are attached or removed using `.on('message')`, the port will
be `ref()`ed and `unref()`ed automatically depending on whether
listeners for the event exist.

### port.start()
<!-- YAML
added: REPLACEME
-->

Starts receiving messages on this `MessagePort`. When using this port
as an event emitter, this will be called automatically once `'message'`
listeners are attached.

### port.unref()
<!-- YAML
added: REPLACEME
-->

Calling `unref()` on a port will allow the thread to exit if this is the only
active handle in the event system. If the port is already `unref()`ed calling
`unref()` again will have no effect.

If listeners are attached or removed using `.on('message')`, the port will
be `ref()`ed and `unref()`ed automatically depending on whether
listeners for the event exist.

[`Buffer`]: buffer.html
[`EventEmitter`]: events.html
[`MessagePort`]: #worker_class_messageport
[`port.postMessage()`]: #worker_port_postmessage_value_transferlist
[v8.serdes]: v8.html#v8_serialization_api
[`Uint8Array`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Uint8Array
[browser `MessagePort`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort
[HTML structured clone algorithm]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm
3 changes: 2 additions & 1 deletion lib/internal/bootstrap/loaders.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@
};

NativeModule.isInternal = function(id) {
return id.startsWith('internal/');
return id.startsWith('internal/') ||
(id === 'worker' && !process.binding('config').experimentalWorker);
};
}

Expand Down
5 changes: 5 additions & 0 deletions lib/internal/modules/cjs/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ const builtinLibs = [
'v8', 'vm', 'zlib'
];

if (process.binding('config').experimentalWorker) {
builtinLibs.push('worker');
builtinLibs.sort();
}

if (typeof process.binding('inspector').open === 'function') {
builtinLibs.push('inspector');
builtinLibs.sort();
Expand Down
105 changes: 105 additions & 0 deletions lib/internal/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
'use strict';

const EventEmitter = require('events');
const util = require('util');

const { internalBinding } = require('internal/bootstrap/loaders');
const { MessagePort, MessageChannel } = internalBinding('messaging');
const { handle_onclose } = internalBinding('symbols');

util.inherits(MessagePort, EventEmitter);

const kOnMessageListener = Symbol('kOnMessageListener');

const debug = util.debuglog('worker');

// A MessagePort consists of a handle (that wraps around an
// uv_async_t) which can receive information from other threads and emits
// .onmessage events, and a function used for sending data to a MessagePort
// in some other thread.
MessagePort.prototype[kOnMessageListener] = function onmessage(payload) {
debug('received message', payload);
// Emit the deserialized object to userland.
this.emit('message', payload);
};

// This is for compatibility with the Web's MessagePort API. It makes sense to
// provide it as an `EventEmitter` in Node.js, but if somebody overrides
// `onmessage`, we'll switch over to the Web API model.
Object.defineProperty(MessagePort.prototype, 'onmessage', {
enumerable: true,
configurable: true,
get() {
return this[kOnMessageListener];
},
set(value) {
this[kOnMessageListener] = value;
if (typeof value === 'function') {
this.ref();
this.start();
} else {
this.unref();
this.stop();
}
}
});

// This is called from inside the `MessagePort` constructor.
function oninit() {
setupPortReferencing(this, this, 'message');
}

Object.defineProperty(MessagePort.prototype, 'oninit', {
enumerable: true,
writable: false,
value: oninit
});

// This is called after the underlying `uv_async_t` has been closed.
function onclose() {
if (typeof this.onclose === 'function') {
// Not part of the Web standard yet, but there aren't many reasonable
// alternatives in a non-EventEmitter usage setting.
// Refs: https://github.com/whatwg/html/issues/1766
this.onclose();
}
this.emit('close');
}

Object.defineProperty(MessagePort.prototype, handle_onclose, {
enumerable: false,
writable: false,
value: onclose
});

const originalClose = MessagePort.prototype.close;
MessagePort.prototype.close = function(cb) {
if (typeof cb === 'function')
this.once('close', cb);
originalClose.call(this);
};

function setupPortReferencing(port, eventEmitter, eventName) {
// Keep track of whether there are any workerMessage listeners:
// If there are some, ref() the channel so it keeps the event loop alive.
// If there are none or all are removed, unref() the channel so the worker
// can shutdown gracefully.
port.unref();
eventEmitter.on('newListener', (name) => {
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
port.ref();
port.start();
}
});
eventEmitter.on('removeListener', (name) => {
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
port.stop();
port.unref();
}
});
}

module.exports = {
MessagePort,
MessageChannel
};
5 changes: 5 additions & 0 deletions lib/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
'use strict';

const { MessagePort, MessageChannel } = require('internal/worker');

module.exports = { MessagePort, MessageChannel };
4 changes: 4 additions & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
'lib/util.js',
'lib/v8.js',
'lib/vm.js',
'lib/worker.js',
'lib/zlib.js',
'lib/internal/assert.js',
'lib/internal/async_hooks.js',
Expand Down Expand Up @@ -156,6 +157,7 @@
'lib/internal/validators.js',
'lib/internal/stream_base_commons.js',
'lib/internal/vm/module.js',
'lib/internal/worker.js',
'lib/internal/streams/lazy_transform.js',
'lib/internal/streams/async_iterator.js',
'lib/internal/streams/buffer_list.js',
Expand Down Expand Up @@ -334,6 +336,7 @@
'src/node_file.cc',
'src/node_http2.cc',
'src/node_http_parser.cc',
'src/node_messaging.cc',
'src/node_os.cc',
'src/node_platform.cc',
'src/node_perf.cc',
Expand Down Expand Up @@ -391,6 +394,7 @@
'src/node_http2_state.h',
'src/node_internals.h',
'src/node_javascript.h',
'src/node_messaging.h',
'src/node_mutex.h',
'src/node_perf.h',
'src/node_perf_common.h',
Expand Down
1 change: 1 addition & 0 deletions src/async_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ namespace node {
V(HTTP2SETTINGS) \
V(HTTPPARSER) \
V(JSSTREAM) \
V(MESSAGEPORT) \
V(PIPECONNECTWRAP) \
V(PIPESERVERWRAP) \
V(PIPEWRAP) \
Expand Down
Loading

0 comments on commit e7a2367

Please sign in to comment.