@@ -5,6 +5,7 @@ const EventEmitter = require('events');
55const assert = require ( 'assert' ) ;
66const path = require ( 'path' ) ;
77const util = require ( 'util' ) ;
8+ const { Readable, Writable } = require ( 'stream' ) ;
89const {
910 ERR_INVALID_ARG_TYPE ,
1011 ERR_WORKER_NEED_ABSOLUTE_PATH ,
@@ -29,13 +30,20 @@ const isMainThread = threadId === 0;
2930
3031const kOnMessageListener = Symbol ( 'kOnMessageListener' ) ;
3132const kHandle = Symbol ( 'kHandle' ) ;
33+ const kName = Symbol ( 'kName' ) ;
3234const kPort = Symbol ( 'kPort' ) ;
3335const kPublicPort = Symbol ( 'kPublicPort' ) ;
3436const kDispose = Symbol ( 'kDispose' ) ;
3537const kOnExit = Symbol ( 'kOnExit' ) ;
3638const kOnMessage = Symbol ( 'kOnMessage' ) ;
3739const kOnCouldNotSerializeErr = Symbol ( 'kOnCouldNotSerializeErr' ) ;
3840const kOnErrorMessage = Symbol ( 'kOnErrorMessage' ) ;
41+ const kParentSideStdio = Symbol ( 'kParentSideStdio' ) ;
42+ const kWritableCallbacks = Symbol ( 'kWritableCallbacks' ) ;
43+ const kStdioWantsMoreDataCallback = Symbol ( 'kStdioWantsMoreDataCallback' ) ;
44+ const kStartedReading = Symbol ( 'kStartedReading' ) ;
45+ const kWaitingStreams = Symbol ( 'kWaitingStreams' ) ;
46+ const kIncrementsPortRef = Symbol ( 'kIncrementsPortRef' ) ;
3947
4048const debug = util . debuglog ( 'worker' ) ;
4149
@@ -129,6 +137,72 @@ function setupPortReferencing(port, eventEmitter, eventName) {
129137}
130138
131139
140+ class ReadableWorkerStdio extends Readable {
141+ constructor ( port , name ) {
142+ super ( ) ;
143+ this [ kPort ] = port ;
144+ this [ kName ] = name ;
145+ this [ kIncrementsPortRef ] = true ;
146+ this [ kStartedReading ] = false ;
147+ this . on ( 'end' , ( ) => {
148+ if ( this [ kIncrementsPortRef ] && -- this [ kPort ] [ kWaitingStreams ] === 0 )
149+ this [ kPort ] . unref ( ) ;
150+ } ) ;
151+ }
152+
153+ _read ( ) {
154+ if ( ! this [ kStartedReading ] && this [ kIncrementsPortRef ] ) {
155+ this [ kStartedReading ] = true ;
156+ if ( this [ kPort ] [ kWaitingStreams ] ++ === 0 )
157+ this [ kPort ] . ref ( ) ;
158+ }
159+
160+ this [ kPort ] . postMessage ( {
161+ type : 'stdioWantsMoreData' ,
162+ stream : this [ kName ]
163+ } ) ;
164+ }
165+ }
166+
167+ class WritableWorkerStdio extends Writable {
168+ constructor ( port , name ) {
169+ super ( { decodeStrings : false } ) ;
170+ this [ kPort ] = port ;
171+ this [ kName ] = name ;
172+ this [ kWritableCallbacks ] = [ ] ;
173+ }
174+
175+ _write ( chunk , encoding , cb ) {
176+ this [ kPort ] . postMessage ( {
177+ type : 'stdioPayload' ,
178+ stream : this [ kName ] ,
179+ chunk,
180+ encoding
181+ } ) ;
182+ this [ kWritableCallbacks ] . push ( cb ) ;
183+ if ( this [ kPort ] [ kWaitingStreams ] ++ === 0 )
184+ this [ kPort ] . ref ( ) ;
185+ }
186+
187+ _final ( cb ) {
188+ this [ kPort ] . postMessage ( {
189+ type : 'stdioPayload' ,
190+ stream : this [ kName ] ,
191+ chunk : null
192+ } ) ;
193+ cb ( ) ;
194+ }
195+
196+ [ kStdioWantsMoreDataCallback ] ( ) {
197+ const cbs = this [ kWritableCallbacks ] ;
198+ this [ kWritableCallbacks ] = [ ] ;
199+ for ( const cb of cbs )
200+ cb ( ) ;
201+ if ( ( this [ kPort ] [ kWaitingStreams ] -= cbs . length ) === 0 )
202+ this [ kPort ] . unref ( ) ;
203+ }
204+ }
205+
132206class Worker extends EventEmitter {
133207 constructor ( filename , options = { } ) {
134208 super ( ) ;
@@ -154,8 +228,25 @@ class Worker extends EventEmitter {
154228 this [ kPort ] . on ( 'message' , ( data ) => this [ kOnMessage ] ( data ) ) ;
155229 this [ kPort ] . start ( ) ;
156230 this [ kPort ] . unref ( ) ;
231+ this [ kPort ] [ kWaitingStreams ] = 0 ;
157232 debug ( `[${ threadId } ] created Worker with ID ${ this . threadId } ` ) ;
158233
234+ let stdin = null ;
235+ if ( options . stdin )
236+ stdin = new WritableWorkerStdio ( this [ kPort ] , 'stdin' ) ;
237+ const stdout = new ReadableWorkerStdio ( this [ kPort ] , 'stdout' ) ;
238+ if ( ! options . stdout ) {
239+ stdout [ kIncrementsPortRef ] = false ;
240+ pipeWithoutWarning ( stdout , process . stdout ) ;
241+ }
242+ const stderr = new ReadableWorkerStdio ( this [ kPort ] , 'stderr' ) ;
243+ if ( ! options . stderr ) {
244+ stderr [ kIncrementsPortRef ] = false ;
245+ pipeWithoutWarning ( stderr , process . stderr ) ;
246+ }
247+
248+ this [ kParentSideStdio ] = { stdin, stdout, stderr } ;
249+
159250 const { port1, port2 } = new MessageChannel ( ) ;
160251 this [ kPublicPort ] = port1 ;
161252 this [ kPublicPort ] . on ( 'message' , ( message ) => this . emit ( 'message' , message ) ) ;
@@ -165,7 +256,8 @@ class Worker extends EventEmitter {
165256 filename,
166257 doEval : ! ! options . eval ,
167258 workerData : options . workerData ,
168- publicPort : port2
259+ publicPort : port2 ,
260+ hasStdin : ! ! options . stdin
169261 } , [ port2 ] ) ;
170262 // Actually start the new thread now that everything is in place.
171263 this [ kHandle ] . startThread ( ) ;
@@ -197,6 +289,16 @@ class Worker extends EventEmitter {
197289 return this [ kOnCouldNotSerializeErr ] ( ) ;
198290 case 'errorMessage' :
199291 return this [ kOnErrorMessage ] ( message . error ) ;
292+ case 'stdioPayload' :
293+ {
294+ const { stream, chunk, encoding } = message ;
295+ return this [ kParentSideStdio ] [ stream ] . push ( chunk , encoding ) ;
296+ }
297+ case 'stdioWantsMoreData' :
298+ {
299+ const { stream } = message ;
300+ return this [ kParentSideStdio ] [ stream ] [ kStdioWantsMoreDataCallback ] ( ) ;
301+ }
200302 }
201303
202304 assert . fail ( `Unknown worker message type ${ message . type } ` ) ;
@@ -207,6 +309,18 @@ class Worker extends EventEmitter {
207309 this [ kHandle ] = null ;
208310 this [ kPort ] = null ;
209311 this [ kPublicPort ] = null ;
312+
313+ const { stdout, stderr } = this [ kParentSideStdio ] ;
314+ this [ kParentSideStdio ] = null ;
315+
316+ if ( ! stdout . _readableState . ended ) {
317+ debug ( `[${ threadId } ] explicitly closes stdout for ${ this . threadId } ` ) ;
318+ stdout . push ( null ) ;
319+ }
320+ if ( ! stderr . _readableState . ended ) {
321+ debug ( `[${ threadId } ] explicitly closes stderr for ${ this . threadId } ` ) ;
322+ stderr . push ( null ) ;
323+ }
210324 }
211325
212326 postMessage ( ...args ) {
@@ -243,6 +357,27 @@ class Worker extends EventEmitter {
243357
244358 return this [ kHandle ] . threadId ;
245359 }
360+
361+ get stdin ( ) {
362+ return this [ kParentSideStdio ] . stdin ;
363+ }
364+
365+ get stdout ( ) {
366+ return this [ kParentSideStdio ] . stdout ;
367+ }
368+
369+ get stderr ( ) {
370+ return this [ kParentSideStdio ] . stderr ;
371+ }
372+ }
373+
374+ const workerStdio = { } ;
375+ if ( ! isMainThread ) {
376+ const port = getEnvMessagePort ( ) ;
377+ port [ kWaitingStreams ] = 0 ;
378+ workerStdio . stdin = new ReadableWorkerStdio ( port , 'stdin' ) ;
379+ workerStdio . stdout = new WritableWorkerStdio ( port , 'stdout' ) ;
380+ workerStdio . stderr = new WritableWorkerStdio ( port , 'stderr' ) ;
246381}
247382
248383let originalFatalException ;
@@ -256,10 +391,14 @@ function setupChild(evalScript) {
256391
257392 port . on ( 'message' , ( message ) => {
258393 if ( message . type === 'loadScript' ) {
259- const { filename, doEval, workerData, publicPort } = message ;
394+ const { filename, doEval, workerData, publicPort, hasStdin } = message ;
260395 publicWorker . parentPort = publicPort ;
261396 setupPortReferencing ( publicPort , publicPort , 'message' ) ;
262397 publicWorker . workerData = workerData ;
398+
399+ if ( ! hasStdin )
400+ workerStdio . stdin . push ( null ) ;
401+
263402 debug ( `[${ threadId } ] starts worker script ${ filename } ` +
264403 `(eval = ${ eval } ) at cwd = ${ process . cwd ( ) } ` ) ;
265404 port . unref ( ) ;
@@ -271,6 +410,14 @@ function setupChild(evalScript) {
271410 require ( 'module' ) . runMain ( ) ;
272411 }
273412 return ;
413+ } else if ( message . type === 'stdioPayload' ) {
414+ const { stream, chunk, encoding } = message ;
415+ workerStdio [ stream ] . push ( chunk , encoding ) ;
416+ return ;
417+ } else if ( message . type === 'stdioWantsMoreData' ) {
418+ const { stream } = message ;
419+ workerStdio [ stream ] [ kStdioWantsMoreDataCallback ] ( ) ;
420+ return ;
274421 }
275422
276423 assert . fail ( `Unknown worker message type ${ message . type } ` ) ;
@@ -317,11 +464,24 @@ function deserializeError(error) {
317464 error . byteLength ) . toString ( 'utf8' ) ;
318465}
319466
467+ function pipeWithoutWarning ( source , dest ) {
468+ const sourceMaxListeners = source . _maxListeners ;
469+ const destMaxListeners = dest . _maxListeners ;
470+ source . setMaxListeners ( Infinity ) ;
471+ dest . setMaxListeners ( Infinity ) ;
472+
473+ source . pipe ( dest ) ;
474+
475+ source . _maxListeners = sourceMaxListeners ;
476+ dest . _maxListeners = destMaxListeners ;
477+ }
478+
320479module . exports = {
321480 MessagePort,
322481 MessageChannel,
323482 threadId,
324483 Worker,
325484 setupChild,
326- isMainThread
485+ isMainThread,
486+ workerStdio
327487} ;
0 commit comments