@@ -35,7 +35,7 @@ const {
35
35
const { exitCodes : { kUnsettledTopLevelAwait } } = internalBinding ( 'errors' ) ;
36
36
const { URL } = require ( 'internal/url' ) ;
37
37
const { canParse : URLCanParse } = internalBinding ( 'url' ) ;
38
- const { receiveMessageOnPort } = require ( 'worker_threads' ) ;
38
+ const { receiveMessageOnPort, isMainThread } = require ( 'worker_threads' ) ;
39
39
const {
40
40
isAnyArrayBuffer,
41
41
isArrayBufferView,
@@ -482,6 +482,8 @@ class HooksProxy {
482
482
*/
483
483
#worker;
484
484
485
+ #portToHooksThread;
486
+
485
487
/**
486
488
* The last notification ID received from the worker. This is used to detect
487
489
* if the worker has already sent a notification before putting the main
@@ -499,26 +501,38 @@ class HooksProxy {
499
501
#isReady = false ;
500
502
501
503
constructor ( ) {
502
- const { InternalWorker } = require ( 'internal/worker' ) ;
503
- MessageChannel ??= require ( 'internal/worker/io' ) . MessageChannel ;
504
-
504
+ const { InternalWorker, hooksPort } = require ( 'internal/worker' ) ;
505
505
const lock = new SharedArrayBuffer ( SHARED_MEMORY_BYTE_LENGTH ) ;
506
506
this . #lock = new Int32Array ( lock ) ;
507
507
508
- this . #worker = new InternalWorker ( loaderWorkerId , {
509
- stderr : false ,
510
- stdin : false ,
511
- stdout : false ,
512
- trackUnmanagedFds : false ,
513
- workerData : {
514
- lock,
515
- } ,
516
- } ) ;
517
- this . #worker. unref ( ) ; // ! Allows the process to eventually exit.
518
- this . #worker. on ( 'exit' , process . exit ) ;
508
+ if ( isMainThread ) {
509
+ // Main thread is the only one that creates the internal single hooks worker
510
+ this . #worker = new InternalWorker ( loaderWorkerId , {
511
+ stderr : false ,
512
+ stdin : false ,
513
+ stdout : false ,
514
+ trackUnmanagedFds : false ,
515
+ workerData : {
516
+ lock,
517
+ } ,
518
+ } ) ;
519
+ this . #worker. unref ( ) ; // ! Allows the process to eventually exit.
520
+ this . #worker. on ( 'exit' , process . exit ) ;
521
+ this . #portToHooksThread = this . #worker;
522
+ } else {
523
+ this . #portToHooksThread = hooksPort ;
524
+ }
519
525
}
520
526
521
527
waitForWorker ( ) {
528
+ // There is one Hooks instance for each worker thread. But only one of these Hooks instances
529
+ // has an InternalWorker. That was the Hooks instance created for the main thread.
530
+ // It means for all Hooks instances that are not on the main thread => they are ready because they
531
+ // delegate to the single InternalWorker anyway.
532
+ if ( ! isMainThread ) {
533
+ return ;
534
+ }
535
+
522
536
if ( ! this . #isReady) {
523
537
const { kIsOnline } = require ( 'internal/worker' ) ;
524
538
if ( ! this . #worker[ kIsOnline ] ) {
@@ -535,6 +549,37 @@ class HooksProxy {
535
549
}
536
550
}
537
551
552
+ #postMessageToWorker( method , type , transferList , args ) {
553
+ this . waitForWorker ( ) ;
554
+
555
+ MessageChannel ??= require ( 'internal/worker/io' ) . MessageChannel ;
556
+
557
+ const {
558
+ port1 : fromHooksThread ,
559
+ port2 : toHooksThread ,
560
+ } = new MessageChannel ( ) ;
561
+
562
+ // Pass work to the worker.
563
+ debug ( `post ${ type } message to worker` , { method, args, transferList } ) ;
564
+ const usedTransferList = [ toHooksThread ] ;
565
+ if ( transferList ) {
566
+ ArrayPrototypePushApply ( usedTransferList , transferList ) ;
567
+ }
568
+
569
+ this . #portToHooksThread. postMessage (
570
+ {
571
+ __proto__ : null ,
572
+ args,
573
+ lock : this . #lock,
574
+ method,
575
+ port : toHooksThread ,
576
+ } ,
577
+ usedTransferList ,
578
+ ) ;
579
+
580
+ return fromHooksThread ;
581
+ }
582
+
538
583
/**
539
584
* Invoke a remote method asynchronously.
540
585
* @param {string } method Method to invoke
@@ -543,22 +588,7 @@ class HooksProxy {
543
588
* @returns {Promise<any> }
544
589
*/
545
590
async makeAsyncRequest ( method , transferList , ...args ) {
546
- this . waitForWorker ( ) ;
547
-
548
- MessageChannel ??= require ( 'internal/worker/io' ) . MessageChannel ;
549
- const asyncCommChannel = new MessageChannel ( ) ;
550
-
551
- // Pass work to the worker.
552
- debug ( 'post async message to worker' , { method, args, transferList } ) ;
553
- const finalTransferList = [ asyncCommChannel . port2 ] ;
554
- if ( transferList ) {
555
- ArrayPrototypePushApply ( finalTransferList , transferList ) ;
556
- }
557
- this . #worker. postMessage ( {
558
- __proto__ : null ,
559
- method, args,
560
- port : asyncCommChannel . port2 ,
561
- } , finalTransferList ) ;
591
+ const fromHooksThread = this . #postMessageToWorker( method , 'Async' , transferList , args ) ;
562
592
563
593
if ( this . #numberOfPendingAsyncResponses++ === 0 ) {
564
594
// On the next lines, the main thread will await a response from the worker thread that might
@@ -567,7 +597,11 @@ class HooksProxy {
567
597
// However we want to keep the process alive until the worker thread responds (or until the
568
598
// event loop of the worker thread is also empty), so we ref the worker until we get all the
569
599
// responses back.
570
- this . #worker. ref ( ) ;
600
+ if ( this . #worker) {
601
+ this . #worker. ref ( ) ;
602
+ } else {
603
+ this . #portToHooksThread. ref ( ) ;
604
+ }
571
605
}
572
606
573
607
let response ;
@@ -576,18 +610,26 @@ class HooksProxy {
576
610
await AtomicsWaitAsync ( this . #lock, WORKER_TO_MAIN_THREAD_NOTIFICATION , this . #workerNotificationLastId) . value ;
577
611
this . #workerNotificationLastId = AtomicsLoad ( this . #lock, WORKER_TO_MAIN_THREAD_NOTIFICATION ) ;
578
612
579
- response = receiveMessageOnPort ( asyncCommChannel . port1 ) ;
613
+ response = receiveMessageOnPort ( fromHooksThread ) ;
580
614
} while ( response == null ) ;
581
615
debug ( 'got async response from worker' , { method, args } , this . #lock) ;
582
616
583
617
if ( -- this . #numberOfPendingAsyncResponses === 0 ) {
584
618
// We got all the responses from the worker, its job is done (until next time).
585
- this . #worker. unref ( ) ;
619
+ if ( this . #worker) {
620
+ this . #worker. unref ( ) ;
621
+ } else {
622
+ this . #portToHooksThread. unref ( ) ;
623
+ }
624
+ }
625
+
626
+ if ( response . message . status === 'exit' ) {
627
+ process . exit ( response . message . body ) ;
586
628
}
587
629
588
- const body = this . #unwrapMessage ( response ) ;
589
- asyncCommChannel . port1 . close ( ) ;
590
- return body ;
630
+ fromHooksThread . close ( ) ;
631
+
632
+ return this . #unwrapMessage ( response ) ;
591
633
}
592
634
593
635
/**
@@ -598,11 +640,7 @@ class HooksProxy {
598
640
* @returns {any }
599
641
*/
600
642
makeSyncRequest ( method , transferList , ...args ) {
601
- this . waitForWorker ( ) ;
602
-
603
- // Pass work to the worker.
604
- debug ( 'post sync message to worker' , { method, args, transferList } ) ;
605
- this . #worker. postMessage ( { __proto__ : null , method, args } , transferList ) ;
643
+ const fromHooksThread = this . #postMessageToWorker( method , 'Sync' , transferList , args ) ;
606
644
607
645
let response ;
608
646
do {
@@ -611,14 +649,17 @@ class HooksProxy {
611
649
AtomicsWait ( this . #lock, WORKER_TO_MAIN_THREAD_NOTIFICATION , this . #workerNotificationLastId) ;
612
650
this . #workerNotificationLastId = AtomicsLoad ( this . #lock, WORKER_TO_MAIN_THREAD_NOTIFICATION ) ;
613
651
614
- response = this . #worker . receiveMessageSync ( ) ;
652
+ response = receiveMessageOnPort ( fromHooksThread ) ;
615
653
} while ( response == null ) ;
616
654
debug ( 'got sync response from worker' , { method, args } ) ;
617
655
if ( response . message . status === 'never-settle' ) {
618
656
process . exit ( kUnsettledTopLevelAwait ) ;
619
657
} else if ( response . message . status === 'exit' ) {
620
658
process . exit ( response . message . body ) ;
621
659
}
660
+
661
+ fromHooksThread . close ( ) ;
662
+
622
663
return this . #unwrapMessage( response ) ;
623
664
}
624
665
0 commit comments