Skip to content

Commit 8fbf662

Browse files
dygabotargos
authored andcommitted
module: have a single hooks thread for all workers
PR-URL: #52706 Reviewed-By: Geoffrey Booth <webadmin@geoffreybooth.com> Reviewed-By: Jacob Smith <jacob@frende.me> Reviewed-By: Antoine du Hamel <duhamelantoine1995@gmail.com> Reviewed-By: Gerhard Stöbich <deb2001-github@yahoo.de>
1 parent 9a7ae9b commit 8fbf662

23 files changed

+395
-81
lines changed

lib/internal/main/worker_thread.js

+2
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ port.on('message', (message) => {
9595
filename,
9696
hasStdin,
9797
publicPort,
98+
hooksPort,
9899
workerData,
99100
} = message;
100101

@@ -109,6 +110,7 @@ port.on('message', (message) => {
109110
}
110111

111112
require('internal/worker').assignEnvironmentData(environmentData);
113+
require('internal/worker').hooksPort = hooksPort;
112114

113115
if (SharedArrayBuffer !== undefined) {
114116
// The counter is only passed to the workers created by the main thread,

lib/internal/modules/esm/hooks.js

+84-43
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ const {
3535
const { exitCodes: { kUnsettledTopLevelAwait } } = internalBinding('errors');
3636
const { URL } = require('internal/url');
3737
const { canParse: URLCanParse } = internalBinding('url');
38-
const { receiveMessageOnPort } = require('worker_threads');
38+
const { receiveMessageOnPort, isMainThread } = require('worker_threads');
3939
const {
4040
isAnyArrayBuffer,
4141
isArrayBufferView,
@@ -482,6 +482,8 @@ class HooksProxy {
482482
*/
483483
#worker;
484484

485+
#portToHooksThread;
486+
485487
/**
486488
* The last notification ID received from the worker. This is used to detect
487489
* if the worker has already sent a notification before putting the main
@@ -499,26 +501,38 @@ class HooksProxy {
499501
#isReady = false;
500502

501503
constructor() {
502-
const { InternalWorker } = require('internal/worker');
503-
MessageChannel ??= require('internal/worker/io').MessageChannel;
504-
504+
const { InternalWorker, hooksPort } = require('internal/worker');
505505
const lock = new SharedArrayBuffer(SHARED_MEMORY_BYTE_LENGTH);
506506
this.#lock = new Int32Array(lock);
507507

508-
this.#worker = new InternalWorker(loaderWorkerId, {
509-
stderr: false,
510-
stdin: false,
511-
stdout: false,
512-
trackUnmanagedFds: false,
513-
workerData: {
514-
lock,
515-
},
516-
});
517-
this.#worker.unref(); // ! Allows the process to eventually exit.
518-
this.#worker.on('exit', process.exit);
508+
if (isMainThread) {
509+
// Main thread is the only one that creates the internal single hooks worker
510+
this.#worker = new InternalWorker(loaderWorkerId, {
511+
stderr: false,
512+
stdin: false,
513+
stdout: false,
514+
trackUnmanagedFds: false,
515+
workerData: {
516+
lock,
517+
},
518+
});
519+
this.#worker.unref(); // ! Allows the process to eventually exit.
520+
this.#worker.on('exit', process.exit);
521+
this.#portToHooksThread = this.#worker;
522+
} else {
523+
this.#portToHooksThread = hooksPort;
524+
}
519525
}
520526

521527
waitForWorker() {
528+
// There is one Hooks instance for each worker thread. But only one of these Hooks instances
529+
// has an InternalWorker. That was the Hooks instance created for the main thread.
530+
// It means for all Hooks instances that are not on the main thread => they are ready because they
531+
// delegate to the single InternalWorker anyway.
532+
if (!isMainThread) {
533+
return;
534+
}
535+
522536
if (!this.#isReady) {
523537
const { kIsOnline } = require('internal/worker');
524538
if (!this.#worker[kIsOnline]) {
@@ -535,6 +549,37 @@ class HooksProxy {
535549
}
536550
}
537551

552+
#postMessageToWorker(method, type, transferList, args) {
553+
this.waitForWorker();
554+
555+
MessageChannel ??= require('internal/worker/io').MessageChannel;
556+
557+
const {
558+
port1: fromHooksThread,
559+
port2: toHooksThread,
560+
} = new MessageChannel();
561+
562+
// Pass work to the worker.
563+
debug(`post ${type} message to worker`, { method, args, transferList });
564+
const usedTransferList = [toHooksThread];
565+
if (transferList) {
566+
ArrayPrototypePushApply(usedTransferList, transferList);
567+
}
568+
569+
this.#portToHooksThread.postMessage(
570+
{
571+
__proto__: null,
572+
args,
573+
lock: this.#lock,
574+
method,
575+
port: toHooksThread,
576+
},
577+
usedTransferList,
578+
);
579+
580+
return fromHooksThread;
581+
}
582+
538583
/**
539584
* Invoke a remote method asynchronously.
540585
* @param {string} method Method to invoke
@@ -543,22 +588,7 @@ class HooksProxy {
543588
* @returns {Promise<any>}
544589
*/
545590
async makeAsyncRequest(method, transferList, ...args) {
546-
this.waitForWorker();
547-
548-
MessageChannel ??= require('internal/worker/io').MessageChannel;
549-
const asyncCommChannel = new MessageChannel();
550-
551-
// Pass work to the worker.
552-
debug('post async message to worker', { method, args, transferList });
553-
const finalTransferList = [asyncCommChannel.port2];
554-
if (transferList) {
555-
ArrayPrototypePushApply(finalTransferList, transferList);
556-
}
557-
this.#worker.postMessage({
558-
__proto__: null,
559-
method, args,
560-
port: asyncCommChannel.port2,
561-
}, finalTransferList);
591+
const fromHooksThread = this.#postMessageToWorker(method, 'Async', transferList, args);
562592

563593
if (this.#numberOfPendingAsyncResponses++ === 0) {
564594
// On the next lines, the main thread will await a response from the worker thread that might
@@ -567,7 +597,11 @@ class HooksProxy {
567597
// However we want to keep the process alive until the worker thread responds (or until the
568598
// event loop of the worker thread is also empty), so we ref the worker until we get all the
569599
// responses back.
570-
this.#worker.ref();
600+
if (this.#worker) {
601+
this.#worker.ref();
602+
} else {
603+
this.#portToHooksThread.ref();
604+
}
571605
}
572606

573607
let response;
@@ -576,18 +610,26 @@ class HooksProxy {
576610
await AtomicsWaitAsync(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, this.#workerNotificationLastId).value;
577611
this.#workerNotificationLastId = AtomicsLoad(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);
578612

579-
response = receiveMessageOnPort(asyncCommChannel.port1);
613+
response = receiveMessageOnPort(fromHooksThread);
580614
} while (response == null);
581615
debug('got async response from worker', { method, args }, this.#lock);
582616

583617
if (--this.#numberOfPendingAsyncResponses === 0) {
584618
// We got all the responses from the worker, its job is done (until next time).
585-
this.#worker.unref();
619+
if (this.#worker) {
620+
this.#worker.unref();
621+
} else {
622+
this.#portToHooksThread.unref();
623+
}
624+
}
625+
626+
if (response.message.status === 'exit') {
627+
process.exit(response.message.body);
586628
}
587629

588-
const body = this.#unwrapMessage(response);
589-
asyncCommChannel.port1.close();
590-
return body;
630+
fromHooksThread.close();
631+
632+
return this.#unwrapMessage(response);
591633
}
592634

593635
/**
@@ -598,11 +640,7 @@ class HooksProxy {
598640
* @returns {any}
599641
*/
600642
makeSyncRequest(method, transferList, ...args) {
601-
this.waitForWorker();
602-
603-
// Pass work to the worker.
604-
debug('post sync message to worker', { method, args, transferList });
605-
this.#worker.postMessage({ __proto__: null, method, args }, transferList);
643+
const fromHooksThread = this.#postMessageToWorker(method, 'Sync', transferList, args);
606644

607645
let response;
608646
do {
@@ -611,14 +649,17 @@ class HooksProxy {
611649
AtomicsWait(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, this.#workerNotificationLastId);
612650
this.#workerNotificationLastId = AtomicsLoad(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);
613651

614-
response = this.#worker.receiveMessageSync();
652+
response = receiveMessageOnPort(fromHooksThread);
615653
} while (response == null);
616654
debug('got sync response from worker', { method, args });
617655
if (response.message.status === 'never-settle') {
618656
process.exit(kUnsettledTopLevelAwait);
619657
} else if (response.message.status === 'exit') {
620658
process.exit(response.message.body);
621659
}
660+
661+
fromHooksThread.close();
662+
622663
return this.#unwrapMessage(response);
623664
}
624665

lib/internal/modules/esm/loader.js

+16-3
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ const { ModuleWrap, kEvaluating, kEvaluated } = internalBinding('module_wrap');
4141
const {
4242
urlToFilename,
4343
} = require('internal/modules/helpers');
44+
const { isMainThread } = require('worker_threads');
4445
let defaultResolve, defaultLoad, defaultLoadSync, importMetaInitializer;
4546

4647
/**
@@ -607,21 +608,26 @@ class CustomizedModuleLoader {
607608
*/
608609
constructor() {
609610
getHooksProxy();
611+
_hasCustomizations = true;
610612
}
611613

612614
/**
613-
* Register some loader specifier.
615+
* Register a loader specifier.
614616
* @param {string} originalSpecifier The specified URL path of the loader to
615617
* be registered.
616618
* @param {string} parentURL The parent URL from where the loader will be
617619
* registered if using it package name as specifier
618620
* @param {any} [data] Arbitrary data to be passed from the custom loader
619621
* (user-land) to the worker.
620622
* @param {any[]} [transferList] Objects in `data` that are changing ownership
621-
* @returns {{ format: string, url: URL['href'] }}
623+
* @returns {{ format: string, url: URL['href'] } | undefined}
622624
*/
623625
register(originalSpecifier, parentURL, data, transferList) {
624-
return hooksProxy.makeSyncRequest('register', transferList, originalSpecifier, parentURL, data);
626+
if (isMainThread) {
627+
// Only the main thread has a Hooks instance with worker thread. All other Worker threads
628+
// delegate their hooks to the HooksThread of the main thread.
629+
return hooksProxy.makeSyncRequest('register', transferList, originalSpecifier, parentURL, data);
630+
}
625631
}
626632

627633
/**
@@ -719,6 +725,12 @@ function getHooksProxy() {
719725
return hooksProxy;
720726
}
721727

728+
let _hasCustomizations = false;
729+
function hasCustomizations() {
730+
return _hasCustomizations;
731+
}
732+
733+
722734
let cascadedLoader;
723735

724736
/**
@@ -780,6 +792,7 @@ function register(specifier, parentURL = undefined, options) {
780792

781793
module.exports = {
782794
createModuleLoader,
795+
hasCustomizations,
783796
getHooksProxy,
784797
getOrInitializeCascadedLoader,
785798
register,

0 commit comments

Comments
 (0)