Skip to content

Commit 9868d54

Browse files
committed
worker: create per-Environment message port after bootstrap
PR-URL: #26593 Reviewed-By: Gus Caplan <me@gus.host> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent 927f29d commit 9868d54

File tree

5 files changed

+45
-38
lines changed

5 files changed

+45
-38
lines changed

lib/internal/bootstrap/node.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ if (isMainThread) {
146146
setupProcessStdio(getStdout, getStdin, getStderr);
147147
} else {
148148
const { getStdout, getStdin, getStderr } =
149-
workerThreadSetup.initializeWorkerStdio();
149+
workerThreadSetup.createStdioGetters();
150150
setupProcessStdio(getStdout, getStdin, getStderr);
151151
}
152152

lib/internal/process/worker_thread_only.js

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,25 @@
22

33
// This file contains process bootstrappers that can only be
44
// run in the worker thread.
5-
const {
6-
getEnvMessagePort
7-
} = internalBinding('worker');
85

96
const {
10-
kWaitingStreams,
11-
ReadableWorkerStdio,
12-
WritableWorkerStdio
7+
createWorkerStdio
138
} = require('internal/worker/io');
149

1510
const {
1611
codes: { ERR_WORKER_UNSUPPORTED_OPERATION }
1712
} = require('internal/errors');
18-
const workerStdio = {};
19-
20-
function initializeWorkerStdio() {
21-
const port = getEnvMessagePort();
22-
port[kWaitingStreams] = 0;
23-
workerStdio.stdin = new ReadableWorkerStdio(port, 'stdin');
24-
workerStdio.stdout = new WritableWorkerStdio(port, 'stdout');
25-
workerStdio.stderr = new WritableWorkerStdio(port, 'stderr');
2613

14+
let workerStdio;
15+
function lazyWorkerStdio() {
16+
if (!workerStdio) workerStdio = createWorkerStdio();
17+
return workerStdio;
18+
}
19+
function createStdioGetters() {
2720
return {
28-
getStdout() { return workerStdio.stdout; },
29-
getStderr() { return workerStdio.stderr; },
30-
getStdin() { return workerStdio.stdin; }
21+
getStdout() { return lazyWorkerStdio().stdout; },
22+
getStderr() { return lazyWorkerStdio().stderr; },
23+
getStdin() { return lazyWorkerStdio().stdin; }
3124
};
3225
}
3326

@@ -55,7 +48,7 @@ function unavailable(name) {
5548
}
5649

5750
module.exports = {
58-
initializeWorkerStdio,
51+
createStdioGetters,
5952
unavailable,
6053
wrapProcessMethods
6154
};

lib/internal/worker/io.js

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ const {
1111
moveMessagePortToContext,
1212
stopMessagePort
1313
} = internalBinding('messaging');
14-
const { threadId } = internalBinding('worker');
14+
const {
15+
threadId,
16+
getEnvMessagePort
17+
} = internalBinding('worker');
1518

1619
const { Readable, Writable } = require('stream');
1720
const EventEmitter = require('events');
@@ -227,6 +230,16 @@ class WritableWorkerStdio extends Writable {
227230
}
228231
}
229232

233+
function createWorkerStdio() {
234+
const port = getEnvMessagePort();
235+
port[kWaitingStreams] = 0;
236+
return {
237+
stdin: new ReadableWorkerStdio(port, 'stdin'),
238+
stdout: new WritableWorkerStdio(port, 'stdout'),
239+
stderr: new WritableWorkerStdio(port, 'stderr')
240+
};
241+
}
242+
230243
module.exports = {
231244
drainMessagePort,
232245
messageTypes,
@@ -239,5 +252,6 @@ module.exports = {
239252
MessageChannel,
240253
setupPortReferencing,
241254
ReadableWorkerStdio,
242-
WritableWorkerStdio
255+
WritableWorkerStdio,
256+
createWorkerStdio
243257
};

src/node_worker.cc

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -269,22 +269,6 @@ void Worker::Run() {
269269
Debug(this, "Created Environment for worker with id %llu", thread_id_);
270270
if (is_stopped()) return;
271271
{
272-
HandleScope handle_scope(isolate_);
273-
Mutex::ScopedLock lock(mutex_);
274-
// Set up the message channel for receiving messages in the child.
275-
child_port_ = MessagePort::New(env_.get(),
276-
env_->context(),
277-
std::move(child_port_data_));
278-
// MessagePort::New() may return nullptr if execution is terminated
279-
// within it.
280-
if (child_port_ != nullptr)
281-
env_->set_message_port(child_port_->object(isolate_));
282-
283-
Debug(this, "Created message port for worker %llu", thread_id_);
284-
}
285-
286-
if (is_stopped()) return;
287-
{
288272
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
289273
StartWorkerInspector(env_.get(),
290274
std::move(inspector_parent_handle_),
@@ -296,6 +280,9 @@ void Worker::Run() {
296280
Environment::AsyncCallbackScope callback_scope(env_.get());
297281
env_->async_hooks()->push_async_ids(1, 0);
298282
if (!RunBootstrapping(env_.get()).IsEmpty()) {
283+
CreateEnvMessagePort(env_.get());
284+
if (is_stopped()) return;
285+
Debug(this, "Created message port for worker %llu", thread_id_);
299286
USE(StartExecution(env_.get(), "internal/main/worker_thread"));
300287
}
301288

@@ -348,6 +335,19 @@ void Worker::Run() {
348335
Debug(this, "Worker %llu thread stops", thread_id_);
349336
}
350337

338+
void Worker::CreateEnvMessagePort(Environment* env) {
339+
HandleScope handle_scope(isolate_);
340+
Mutex::ScopedLock lock(mutex_);
341+
// Set up the message channel for receiving messages in the child.
342+
child_port_ = MessagePort::New(env,
343+
env->context(),
344+
std::move(child_port_data_));
345+
// MessagePort::New() may return nullptr if execution is terminated
346+
// within it.
347+
if (child_port_ != nullptr)
348+
env->set_message_port(child_port_->object(isolate_));
349+
}
350+
351351
void Worker::JoinThread() {
352352
if (thread_joined_)
353353
return;

src/node_worker.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class Worker : public AsyncWrap {
5050

5151
private:
5252
void OnThreadStopped();
53-
53+
void CreateEnvMessagePort(Environment* env);
5454
const std::string url_;
5555

5656
std::shared_ptr<PerIsolateOptions> per_isolate_opts_;

0 commit comments

Comments
 (0)