@@ -681,6 +681,128 @@ never be called.
681681* Returns: {number} The same ` triggerAsyncId ` that is passed to the
682682` AsyncResource ` constructor.
683683
684+ <a id =" async-resource-worker-pool " ></a >
685+ ### Using ` AsyncResource ` for a ` Worker ` thread pool
686+
687+ The following example shows how to use the ` AsyncResource ` class to properly
688+ provide async tracking for a [ ` Worker ` ] [ ] pool. Other resource pools, such as
689+ database connection pools, can follow a similar model.
690+
691+ Assuming that the task is adding two numbers, using a file named
692+ ` task_processor.js ` with the following content:
693+
694+ ``` js
695+ const { parentPort } = require (' worker_threads' );
696+ parentPort .on (' message' , (task ) => {
697+ parentPort .postMessage (task .a + task .b );
698+ });
699+ ```
700+
701+ a Worker pool around it could use the following structure:
702+
703+ ``` js
704+ const { AsyncResource } = require (' async_hooks' );
705+ const { EventEmitter } = require (' events' );
706+ const path = require (' path' );
707+ const { Worker } = require (' worker_threads' );
708+
709+ const kTaskInfo = Symbol (' kTaskInfo' );
710+ const kWorkerFreedEvent = Symbol (' kWorkerFreedEvent' );
711+
712+ class WorkerPoolTaskInfo extends AsyncResource {
713+ constructor (callback ) {
714+ super (' WorkerPoolTaskInfo' );
715+ this .callback = callback;
716+ }
717+
718+ done (err , result ) {
719+ this .runInAsyncScope (this .callback , null , err, result);
720+ this .emitDestroy (); // `TaskInfo`s are used only once.
721+ }
722+ }
723+
724+ class WorkerPool extends EventEmitter {
725+ constructor (numThreads ) {
726+ super ();
727+ this .numThreads = numThreads;
728+ this .workers = [];
729+ this .freeWorkers = [];
730+
731+ for (let i = 0 ; i < numThreads; i++ )
732+ this .addNewWorker ();
733+ }
734+
735+ addNewWorker () {
736+ const worker = new Worker (path .resolve (__dirname , ' task_processor.js' ));
737+ worker .on (' message' , (result ) => {
738+ // In case of success: Call the callback that was passed to `runTask`,
739+ // remove the `TaskInfo` associated with the Worker, and mark it as free
740+ // again.
741+ worker[kTaskInfo].done (null , result);
742+ worker[kTaskInfo] = null ;
743+ this .freeWorkers .push (worker);
744+ this .emit (kWorkerFreedEvent);
745+ });
746+ worker .on (' error' , (err ) => {
747+ // In case of an uncaught exception: Call the callback that was passed to
748+ // `runTask` with the error.
749+ if (worker[kTaskInfo])
750+ worker[kTaskInfo].done (err, null );
751+ else
752+ this .emit (' error' , err);
753+ // Remove the worker from the list and start a new Worker to replace the
754+ // current one.
755+ this .workers .splice (this .workers .indexOf (worker), 1 );
756+ this .addNewWorker ();
757+ });
758+ this .workers .push (worker);
759+ this .freeWorkers .push (worker);
760+ }
761+
762+ runTask (task , callback ) {
763+ if (this .freeWorkers .length === 0 ) {
764+ // No free threads, wait until a worker thread becomes free.
765+ this .once (kWorkerFreedEvent, () => this .runTask (task, callback));
766+ return ;
767+ }
768+
769+ const worker = this .freeWorkers .pop ();
770+ worker[kTaskInfo] = new WorkerPoolTaskInfo (callback);
771+ worker .postMessage (task);
772+ }
773+
774+ close () {
775+ for (const worker of this .workers ) worker .terminate ();
776+ }
777+ }
778+
779+ module .exports = WorkerPool;
780+ ```
781+
782+ Without the explicit tracking added by the ` WorkerPoolTaskInfo ` objects,
783+ it would appear that the callbacks are associated with the individual ` Worker `
784+ objects. However, the creation of the ` Worker ` s is not associated with the
785+ creation of the tasks and does not provide information about when tasks
786+ were scheduled.
787+
788+ This pool could be used as follows:
789+
790+ ``` js
791+ const WorkerPool = require (' ./worker_pool.js' );
792+ const os = require (' os' );
793+
794+ const pool = new WorkerPool (os .cpus ().length );
795+
796+ let finished = 0 ;
797+ for (let i = 0 ; i < 10 ; i++ ) {
798+ pool .runTask ({ a: 42 , b: 100 }, (err , result ) => {
799+ console .log (i, err, result);
800+ if (++ finished === 10 )
801+ pool .close ();
802+ });
803+ }
804+ ```
805+
684806[ `after` callback ] : #async_hooks_after_asyncid
685807[ `before` callback ] : #async_hooks_before_asyncid
686808[ `destroy` callback ] : #async_hooks_destroy_asyncid
0 commit comments