diff --git a/lib/internal/quic/core.js b/lib/internal/quic/core.js index fd62205f212101..a8d35c47a8633e 100644 --- a/lib/internal/quic/core.js +++ b/lib/internal/quic/core.js @@ -476,10 +476,14 @@ function onStreamReady(streamHandle, id, push_id) { const stream = new QuicStream({ ...session[kStreamOptions], writable: !(id & 0b10), - }, session, push_id); - stream[kSetHandle](streamHandle); - session[kAddStream](id, stream); - process.nextTick(emit.bind(session, 'stream', stream)); + }, session, streamHandle, push_id); + process.nextTick(() => { + try { + session.emit('stream', stream); + } catch (error) { + stream.destroy(error); + } + }); } // Called by the C++ internals when a stream is closed and @@ -2188,16 +2192,11 @@ class QuicSession extends EventEmitter { if (handle === undefined) throw new ERR_OPERATION_FAILED('Unable to create QuicStream'); - const stream = new QuicStream({ + return new QuicStream({ highWaterMark, defaultEncoding, readable: !halfOpen - }, this); - - stream[kSetHandle](handle); - this[kAddStream](stream.id, stream); - - return stream; + }, this, handle); } get duration() { @@ -2556,7 +2555,7 @@ class QuicStream extends Duplex { stats: undefined, }; - constructor(options, session, push_id) { + constructor(options, session, handle, push_id) { const { highWaterMark, defaultEncoding, @@ -2583,11 +2582,7 @@ class QuicStream extends Duplex { this._readableState.readingMore = true; this.on('pause', streamOnPause); - // The QuicStream writes are corked until kSetHandle - // is set, ensuring that writes are buffered in JavaScript - // until we have somewhere to send them. - // TODO(@jasnell): We need a better mechanism for this. - this.cork(); + this[kSetHandle](handle); } // Set handle is called once the QuicSession has been able @@ -2607,8 +2602,7 @@ class QuicStream extends Duplex { state.dataRateHistogram = new Histogram(handle.rate); state.dataSizeHistogram = new Histogram(handle.size); state.dataAckHistogram = new Histogram(handle.ack); - this.uncork(); - this.emit('ready'); + state.session[kAddStream](state.id, this); } else { if (state.dataRateHistogram) state.dataRateHistogram[kDestroyHistogram](); @@ -3008,15 +3002,11 @@ class QuicStream extends Duplex { 'Push is either disabled or currently blocked.'); } - const stream = new QuicStream({ + return new QuicStream({ readable: false, highWaterMark, defaultEncoding, - }, this.session); - - stream[kSetHandle](handle); - this.session[kAddStream](stream.id, stream); - return stream; + }, this.session, handle); } submitInformationalHeaders(headers = {}) {