Skip to content

Commit

Permalink
worker: add ports property to MessageEvents
Browse files Browse the repository at this point in the history
Add `ev.ports` for spec compliancy.

Since we only emit the raw `data` value, and only create the
`MessageEvent` instance if there are EventTarget-style listeners,
we store the ports list temporarily on the MessagePort object itself,
so that we can look it up when we need to create the event object.

Fixes: nodejs#37358
  • Loading branch information
addaleax committed Feb 27, 2021
1 parent 5968c54 commit 54da525
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 20 deletions.
16 changes: 12 additions & 4 deletions lib/internal/per_context/messageport.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,30 @@ const {
} = primordials;

class MessageEvent {
constructor(data, target, type) {
constructor(data, target, type, ports) {
this.data = data;
this.target = target;
this.type = type;
this.ports = ports ?? [];
}
}

const kHybridDispatch = SymbolFor('nodejs.internal.kHybridDispatch');
const kCurrentlyReceivingPorts =
SymbolFor('nodejs.internal.kCurrentlyReceivingPorts');

exports.emitMessage = function(data, type) {
exports.emitMessage = function(data, ports, type) {
if (typeof this[kHybridDispatch] === 'function') {
this[kHybridDispatch](data, type, undefined);
this[kCurrentlyReceivingPorts] = ports;
try {
this[kHybridDispatch](data, type, undefined);
} finally {
this[kCurrentlyReceivingPorts] = undefined;
}
return;
}

const event = new MessageEvent(data, this, type);
const event = new MessageEvent(data, this, type, ports);
if (type === 'message') {
if (typeof this.onmessage === 'function')
this.onmessage(event);
Expand Down
8 changes: 7 additions & 1 deletion lib/internal/worker/io.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const {
ObjectSetPrototypeOf,
ReflectApply,
Symbol,
SymbolFor,
} = primordials;

const {
Expand Down Expand Up @@ -70,6 +71,8 @@ const kWritableCallbacks = Symbol('kWritableCallbacks');
const kSource = Symbol('kSource');
const kStartedReading = Symbol('kStartedReading');
const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback');
const kCurrentlyReceivingPorts =
SymbolFor('nodejs.internal.kCurrentlyReceivingPorts');

const messageTypes = {
UP_AND_RUNNING: 'upAndRunning',
Expand Down Expand Up @@ -150,7 +153,9 @@ ObjectDefineProperty(
if (type !== 'message' && type !== 'messageerror') {
return ReflectApply(originalCreateEvent, this, arguments);
}
return new MessageEvent(type, { data });
const ports = this[kCurrentlyReceivingPorts];
this[kCurrentlyReceivingPorts] = undefined;
return new MessageEvent(type, { data, ports });
},
configurable: false,
writable: false,
Expand All @@ -161,6 +166,7 @@ ObjectDefineProperty(
function oninit() {
initNodeEventTarget(this);
setupPortReferencing(this, this, 'message');
this[kCurrentlyReceivingPorts] = undefined;
}

defineEventHandler(MessagePort.prototype, 'message');
Expand Down
44 changes: 36 additions & 8 deletions src/node_messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,18 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {
} // anonymous namespace

MaybeLocal<Value> Message::Deserialize(Environment* env,
Local<Context> context) {
Local<Context> context,
Local<Value>* port_list) {
Context::Scope context_scope(context);

CHECK(!IsCloseMessage());
if (port_list != nullptr && !transferables_.empty()) {
// Need to create this outside of the EscapableHandleScope, but inside
// the Context::Scope.
*port_list = Array::New(env->isolate());
}

EscapableHandleScope handle_scope(env->isolate());
Context::Scope context_scope(context);

// Create all necessary objects for transferables, e.g. MessagePort handles.
std::vector<BaseObjectPtr<BaseObject>> host_objects(transferables_.size());
Expand All @@ -146,10 +153,27 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
});

for (uint32_t i = 0; i < transferables_.size(); ++i) {
HandleScope handle_scope(env->isolate());
TransferData* data = transferables_[i].get();
host_objects[i] = data->Deserialize(
env, context, std::move(transferables_[i]));
if (!host_objects[i]) return {};
if (port_list != nullptr) {
// If we gather a list of all message ports, and this transferred object
// is a message port, add it to that list. This is a bit of an odd case
// of special handling for MessagePorts (as opposed to applying to all
// transferables), but it's required for spec compliancy.
DCHECK(port_list->IsArray());
Local<Array> port_list_array = port_list->As<Array>();
Local<Object> obj = host_objects[i]->object();
if (env->message_port_constructor_template()->HasInstance(obj)) {
if (port_list_array->Set(context,
port_list_array->Length(),
obj).IsNothing()) {
return {};
}
}
}
}
transferables_.clear();

Expand Down Expand Up @@ -664,7 +688,8 @@ MessagePort* MessagePort::New(
}

MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
MessageProcessingMode mode) {
MessageProcessingMode mode,
Local<Value>* port_list) {
std::shared_ptr<Message> received;
{
// Get the head of the message queue.
Expand Down Expand Up @@ -696,7 +721,7 @@ MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,

if (!env()->can_call_into_js()) return MaybeLocal<Value>();

return received->Deserialize(env(), context);
return received->Deserialize(env(), context, port_list);
}

void MessagePort::OnMessage(MessageProcessingMode mode) {
Expand Down Expand Up @@ -735,14 +760,15 @@ void MessagePort::OnMessage(MessageProcessingMode mode) {
Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);

Local<Value> payload;
Local<Value> port_list = Undefined(env()->isolate());
Local<Value> message_error;
Local<Value> argv[2];
Local<Value> argv[3];

{
// Catch any exceptions from parsing the message itself (not from
// emitting it) as 'messageeror' events.
TryCatchScope try_catch(env());
if (!ReceiveMessage(context, mode).ToLocal(&payload)) {
if (!ReceiveMessage(context, mode, &port_list).ToLocal(&payload)) {
if (try_catch.HasCaught() && !try_catch.HasTerminated())
message_error = try_catch.Exception();
goto reschedule;
Expand All @@ -757,13 +783,15 @@ void MessagePort::OnMessage(MessageProcessingMode mode) {
}

argv[0] = payload;
argv[1] = env()->message_string();
argv[1] = port_list;
argv[2] = env()->message_string();

if (MakeCallback(emit_message, arraysize(argv), argv).IsEmpty()) {
reschedule:
if (!message_error.IsEmpty()) {
argv[0] = message_error;
argv[1] = env()->messageerror_string();
argv[1] = Undefined(env()->isolate());
argv[2] = env()->messageerror_string();
USE(MakeCallback(emit_message, arraysize(argv), argv));
}

Expand Down
12 changes: 8 additions & 4 deletions src/node_messaging.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,10 @@ class Message : public MemoryRetainer {

// Deserialize the contained JS value. May only be called once, and only
// after Serialize() has been called (e.g. by another thread).
v8::MaybeLocal<v8::Value> Deserialize(Environment* env,
v8::Local<v8::Context> context);
v8::MaybeLocal<v8::Value> Deserialize(
Environment* env,
v8::Local<v8::Context> context,
v8::Local<v8::Value>* port_list = nullptr);

// Serialize a JS value, and optionally transfer objects, into this message.
// The Message object retains ownership of all transferred objects until
Expand Down Expand Up @@ -293,8 +295,10 @@ class MessagePort : public HandleWrap {
void OnClose() override;
void OnMessage(MessageProcessingMode mode);
void TriggerAsync();
v8::MaybeLocal<v8::Value> ReceiveMessage(v8::Local<v8::Context> context,
MessageProcessingMode mode);
v8::MaybeLocal<v8::Value> ReceiveMessage(
v8::Local<v8::Context> context,
MessageProcessingMode mode,
v8::Local<v8::Value>* port_list = nullptr);

std::unique_ptr<MessagePortData> data_ = nullptr;
bool receiving_messages_ = false;
Expand Down
12 changes: 9 additions & 3 deletions test/parallel/test-worker-message-port-move.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,13 @@ vm.runInContext('(' + function() {

assert(!(port instanceof MessagePort));
assert.strictEqual(port.onmessage, undefined);
port.onmessage = function({ data }) {
port.onmessage = function({ data, ports }) {
assert(data instanceof Object);
port.postMessage(data);
assert(ports instanceof Array);
assert.strictEqual(ports.length, 1);
assert.strictEqual(ports[0], data.p);
assert(!(data.p instanceof MessagePort));
port.postMessage({});
};
port.start();
}
Expand All @@ -55,8 +59,10 @@ vm.runInContext('(' + function() {
}
} + ')()', context);

const otherChannel = new MessageChannel();
port2.on('message', common.mustCall((msg) => {
assert(msg instanceof Object);
port2.close();
otherChannel.port2.close();
}));
port2.postMessage({});
port2.postMessage({ p: otherChannel.port1 }, [ otherChannel.port1 ]);
14 changes: 14 additions & 0 deletions test/parallel/test-worker-message-port.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const { MessageChannel, MessagePort } = require('worker_threads');
port1.onmessage = common.mustCall((message) => {
assert.strictEqual(message.data, 4);
assert.strictEqual(message.target, port1);
assert.deepStrictEqual(message.ports, []);
port2.close(common.mustCall());
});

Expand Down Expand Up @@ -161,6 +162,19 @@ const { MessageChannel, MessagePort } = require('worker_threads');
port1.close();
}

{
// Test MessageEvent#ports
const c1 = new MessageChannel();
const c2 = new MessageChannel();
c1.port1.postMessage({ port: c2.port2 }, [ c2.port2 ]);
c1.port2.addEventListener('message', common.mustCall((ev) => {
assert.strictEqual(ev.ports.length, 1);
assert.strictEqual(ev.ports[0].constructor, MessagePort);
c1.port1.close();
c2.port1.close();
}));
}

{
assert.deepStrictEqual(
Object.getOwnPropertyNames(MessagePort.prototype).sort(),
Expand Down

0 comments on commit 54da525

Please sign in to comment.