File tree Expand file tree Collapse file tree 3 files changed +56
-0
lines changed Expand file tree Collapse file tree 3 files changed +56
-0
lines changed Original file line number Diff line number Diff line change @@ -609,11 +609,25 @@ void MessagePort::OnMessage() {
609609 HandleScope handle_scope (env ()->isolate ());
610610 Local<Context> context = object (env ()->isolate ())->CreationContext ();
611611
612+ ssize_t processing_limit;
613+ {
614+ Mutex::ScopedLock (data_->mutex_ );
615+ processing_limit = data_->incoming_messages_ .size ();
616+ }
617+
612618 // data_ can only ever be modified by the owner thread, so no need to lock.
613619 // However, the message port may be transferred while it is processing
614620 // messages, so we need to check that this handle still owns its `data_` field
615621 // on every iteration.
616622 while (data_) {
623+ if (--processing_limit < 0 ) {
624+ // Prevent event loop starvation by only processing those messages without
625+ // interruption that were already present when the OnMessage() call was
626+ // first triggered.
627+ TriggerAsync ();
628+ return ;
629+ }
630+
617631 HandleScope handle_scope (env ()->isolate ());
618632 Context::Scope context_scope (context);
619633
Original file line number Diff line number Diff line change 1+ 'use strict' ;
2+ const common = require ( '../common' ) ;
3+
4+ const { MessageChannel } = require ( 'worker_threads' ) ;
5+
6+ // Make sure that closing a message port while receiving messages on it does
7+ // not stop messages that are already in the queue from being emitted.
8+
9+ const { port1, port2 } = new MessageChannel ( ) ;
10+
11+ port1 . on ( 'message' , common . mustCall ( ( ) => {
12+ port1 . close ( ) ;
13+ } , 2 ) ) ;
14+ port2 . postMessage ( 'foo' ) ;
15+ port2 . postMessage ( 'bar' ) ;
Original file line number Diff line number Diff line change 1+ 'use strict' ;
2+ const common = require ( '../common' ) ;
3+ const assert = require ( 'assert' ) ;
4+
5+ const { MessageChannel } = require ( 'worker_threads' ) ;
6+
7+ // Make sure that an infinite asynchronous .on('message')/postMessage loop
8+ // does not lead to a stack overflow and does not starve the event loop.
9+ // We schedule timeouts both from before the the .on('message') handler and
10+ // inside of it, which both should run.
11+
12+ const { port1, port2 } = new MessageChannel ( ) ;
13+ let count = 0 ;
14+ port1 . on ( 'message' , ( ) => {
15+ if ( count === 0 ) {
16+ setTimeout ( common . mustCall ( ( ) => {
17+ port1 . close ( ) ;
18+ } ) , 0 ) ;
19+ }
20+
21+ port2 . postMessage ( 0 ) ;
22+ assert ( count ++ < 10000 , `hit ${ count } loop iterations` ) ;
23+ } ) ;
24+
25+ port2 . postMessage ( 0 ) ;
26+
27+ setTimeout ( common . mustCall ( ) , 0 ) ;
You can’t perform that action at this time.
0 commit comments