@@ -397,6 +397,25 @@ function setupChannel(target, channel) {
397397 target . _channel = channel ;
398398 target . _handleQueue = null ;
399399
400+ const control = new class extends EventEmitter {
401+ constructor ( ) {
402+ super ( ) ;
403+ this . channel = channel ;
404+ this . refs = 0 ;
405+ }
406+ ref ( ) {
407+ if ( ++ this . refs === 1 ) {
408+ this . channel . ref ( ) ;
409+ }
410+ }
411+ unref ( ) {
412+ if ( -- this . refs === 0 ) {
413+ this . channel . unref ( ) ;
414+ this . emit ( 'unref' ) ;
415+ }
416+ }
417+ } ;
418+
400419 var decoder = new StringDecoder ( 'utf8' ) ;
401420 var jsonBuffer = '' ;
402421 channel . buffering = false ;
@@ -446,7 +465,7 @@ function setupChannel(target, channel) {
446465 target . _handleQueue = null ;
447466
448467 queue . forEach ( function ( args ) {
449- target . _send ( args . message , args . handle , false ) ;
468+ target . _send ( args . message , args . handle , false , args . callback ) ;
450469 } ) ;
451470
452471 // Process a pending disconnect (if any).
@@ -478,14 +497,24 @@ function setupChannel(target, channel) {
478497 } ) ;
479498 } ) ;
480499
481- target . send = function ( message , handle ) {
482- if ( ! this . connected )
483- this . emit ( 'error' , new Error ( 'channel closed' ) ) ;
484- else
485- this . _send ( message , handle , false ) ;
500+ target . send = function ( message , handle , callback ) {
501+ if ( typeof handle === 'function' ) {
502+ callback = handle ;
503+ handle = undefined ;
504+ }
505+ if ( this . connected ) {
506+ this . _send ( message , handle , false , callback ) ;
507+ return ;
508+ }
509+ const ex = new Error ( 'channel closed' ) ;
510+ if ( typeof callback === 'function' ) {
511+ process . nextTick ( callback , ex ) ;
512+ } else {
513+ this . emit ( 'error' , ex ) ; // FIXME(bnoordhuis) Defer to next tick.
514+ }
486515 } ;
487516
488- target . _send = function ( message , handle , swallowErrors ) {
517+ target . _send = function ( message , handle , swallowErrors , callback ) {
489518 assert ( this . connected || this . _channel ) ;
490519
491520 if ( message === undefined )
@@ -516,7 +545,11 @@ function setupChannel(target, channel) {
516545
517546 // Queue-up message and handle if we haven't received ACK yet.
518547 if ( this . _handleQueue ) {
519- this . _handleQueue . push ( { message : message . msg , handle : handle } ) ;
548+ this . _handleQueue . push ( {
549+ callback : callback ,
550+ handle : handle ,
551+ message : message . msg ,
552+ } ) ;
520553 return ;
521554 }
522555
@@ -538,24 +571,43 @@ function setupChannel(target, channel) {
538571 } else if ( this . _handleQueue &&
539572 ! ( message && message . cmd === 'NODE_HANDLE_ACK' ) ) {
540573 // Queue request anyway to avoid out-of-order messages.
541- this . _handleQueue . push ( { message : message , handle : null } ) ;
574+ this . _handleQueue . push ( {
575+ callback : callback ,
576+ handle : null ,
577+ message : message ,
578+ } ) ;
542579 return ;
543580 }
544581
545582 var req = new WriteWrap ( ) ;
546- req . oncomplete = nop ;
583+ req . async = false ;
584+
547585 var string = JSON . stringify ( message ) + '\n' ;
548586 var err = channel . writeUtf8String ( req , string , handle ) ;
549587
550- if ( err ) {
551- if ( ! swallowErrors )
552- this . emit ( 'error' , errnoException ( err , 'write' ) ) ;
553- } else if ( handle && ! this . _handleQueue ) {
554- this . _handleQueue = [ ] ;
555- }
556-
557- if ( obj && obj . postSend ) {
558- req . oncomplete = obj . postSend . bind ( null , handle ) ;
588+ if ( err === 0 ) {
589+ if ( handle && ! this . _handleQueue )
590+ this . _handleQueue = [ ] ;
591+ req . oncomplete = function ( ) {
592+ if ( this . async === true )
593+ control . unref ( ) ;
594+ if ( obj && obj . postSend )
595+ obj . postSend ( handle ) ;
596+ if ( typeof callback === 'function' )
597+ callback ( null ) ;
598+ } ;
599+ if ( req . async === true ) {
600+ control . ref ( ) ;
601+ } else {
602+ process . nextTick ( function ( ) { req . oncomplete ( ) ; } ) ;
603+ }
604+ } else if ( ! swallowErrors ) {
605+ const ex = errnoException ( err , 'write' ) ;
606+ if ( typeof callback === 'function' ) {
607+ process . nextTick ( callback , ex ) ;
608+ } else {
609+ this . emit ( 'error' , ex ) ; // FIXME(bnoordhuis) Defer to next tick.
610+ }
559611 }
560612
561613 /* If the master is > 2 read() calls behind, please stop sending. */
@@ -616,6 +668,7 @@ function setupChannel(target, channel) {
616668 } ;
617669
618670 channel . readStart ( ) ;
671+ return control ;
619672}
620673
621674
0 commit comments