diff --git a/lib/protocol/connection.js b/lib/protocol/connection.js index 2b86b7f..355ddec 100644 --- a/lib/protocol/connection.js +++ b/lib/protocol/connection.js @@ -214,6 +214,11 @@ Connection.prototype._insert = function _insert(stream, priority) { }; Connection.prototype._reprioritize = function _reprioritize(stream, priority) { + this._removePrioritisedStream(stream); + this._insert(stream, priority); +}; + +Connection.prototype._removePrioritisedStream = function _removePrioritisedStream(stream) { var bucket = this._streamPriorities[stream._priority]; var index = bucket.indexOf(stream); assert(index !== -1); @@ -221,8 +226,6 @@ Connection.prototype._reprioritize = function _reprioritize(stream, priority) { if (bucket.length === 0) { delete this._streamPriorities[stream._priority]; } - - this._insert(stream, priority); }; // Creating an *inbound* stream with the given ID. It is called when there's an incoming frame to @@ -246,9 +249,17 @@ Connection.prototype.createStream = function createStream() { var stream = new Stream(this._log, this); this._allocatePriority(stream); + stream.on('end', this._removeStream.bind(this, stream)); + return stream; }; +Connection.prototype._removeStream = function _removeStream(stream) { + this._log.trace('Removing outbound stream.'); + delete this._streamIds[stream.id]; + this._removePrioritisedStream(stream); +}; + // Multiplexing // ------------ @@ -290,7 +301,7 @@ priority_loop: // 2. if there's no frame, skip this stream // 3. if forwarding this frame would make `streamCount` greater than `streamLimit`, skip // this stream - // 4. adding stream to the bucket of the next round + // 4. adding stream to the bucket of the next round unless it has ended // 5. assigning an ID to the frame (allocating an ID to the stream if there isn't already) // 6. if forwarding a PUSH_PROMISE, allocate ID to the promised stream // 7. forwarding the frame, changing `streamCount` as appropriate @@ -299,6 +310,7 @@ priority_loop: while (bucket.length > 0) { for (var index = 0; index < bucket.length; index++) { var stream = bucket[index]; + if(!stream || !stream.upstream) continue; var frame = stream.upstream.read((this._window > 0) ? this._window : -1); if (!frame) { @@ -307,8 +319,11 @@ priority_loop: stream.upstream.unshift(frame); continue; } - - nextBucket.push(stream); + if (!stream._ended) { + nextBucket.push(stream); + } else { + delete this._streamIds[stream.id]; + } if (frame.stream === undefined) { frame.stream = stream.id || this._allocateId(stream); @@ -323,8 +338,12 @@ priority_loop: var moreNeeded = this.push(frame); this._changeStreamCount(frame.count_change); - assert(moreNeeded !== null); // The frame shouldn't be unforwarded - if (moreNeeded === false) { + //assert(moreNeeded !== null); // The frame shouldn't be unforwarded + if (moreNeeded === null) { + this._log.error('The frame shouldn\'t be unforwarded'); + } + + if (!moreNeeded) { break priority_loop; } } @@ -577,7 +596,8 @@ Connection.prototype.close = function close(error) { }; Connection.prototype._receiveGoaway = function _receiveGoaway(frame) { - this._log.debug({ error: frame.error }, 'Other end closed the connection'); + // this._log.debug({ error: frame.error }, 'Other end closed the connection'); + this._log.error({ frame: frame }, '_receiveGoaway, Other end closed the connection'); this.push(null); this._closed = true; if (frame.error !== 'NO_ERROR') { diff --git a/lib/protocol/framer.js b/lib/protocol/framer.js index 244e60a..fe62e43 100644 --- a/lib/protocol/framer.js +++ b/lib/protocol/framer.js @@ -111,11 +111,11 @@ Deserializer.prototype._transform = function _transform(chunk, encoding, done) { // If it's header then the parsed data is stored in a temporary variable and then the // deserializer waits for the specified length payload. if ((this._cursor === this._buffer.length) && this._waitingForHeader) { - var payloadSize = Deserializer.commonHeader(this._buffer, this._frame); + var payloadSize = Deserializer.commonHeader(this._buffer, this._frame, this._log); if (payloadSize <= MAX_PAYLOAD_SIZE) { this._next(payloadSize); } else { - this.emit('error', 'FRAME_SIZE_ERROR'); + this._log.error({ payloadSize: payloadSize, max: MAX_PAYLOAD_SIZE }, 'FRAME_SIZE_ERROR, _transform'); return; } } @@ -127,7 +127,7 @@ Deserializer.prototype._transform = function _transform(chunk, encoding, done) { // will also run. if ((this._cursor === this._buffer.length) && !this._waitingForHeader) { if (this._frame.type) { - var error = Deserializer[this._frame.type](this._buffer, this._frame, this._role); + var error = Deserializer[this._frame.type](this._buffer, this._frame, this._role, this._log); if (error) { this._log.error('Incoming frame parsing error: ' + error); this.emit('error', error); @@ -234,8 +234,9 @@ Serializer.commonHeader = function writeCommonHeader(frame, buffers) { return size; }; -Deserializer.commonHeader = function readCommonHeader(buffer, frame) { +Deserializer.commonHeader = function readCommonHeader(buffer, frame, logger) { if (buffer.length < 9) { + logger.error({ buffer: buffer }, 'FRAME_SIZE_ERROR, readCommonHeader, buffer.length: ' + buffer.length); return 'FRAME_SIZE_ERROR'; } @@ -297,12 +298,13 @@ Serializer.DATA = function writeData(frame, buffers) { buffers.push(frame.data); }; -Deserializer.DATA = function readData(buffer, frame) { +Deserializer.DATA = function readData(buffer, frame, role, logger) { var dataOffset = 0; var paddingLength = 0; if (frame.flags.PADDED) { if (buffer.length < 1) { // We must have at least one byte for padding control, but we don't. Bad peer! + logger.error({ buffer: buffer }, 'FRAME_SIZE_ERROR, readData, buffer.length: ' + buffer.length); return 'FRAME_SIZE_ERROR'; } paddingLength = (buffer.readUInt8(dataOffset) & 0xff); @@ -376,7 +378,7 @@ Serializer.HEADERS = function writeHeadersPriority(frame, buffers) { buffers.push(frame.data); }; -Deserializer.HEADERS = function readHeadersPriority(buffer, frame) { +Deserializer.HEADERS = function readHeadersPriority(buffer, frame, role, logger) { var minFrameLength = 0; if (frame.flags.PADDED) { minFrameLength += 1; @@ -386,6 +388,8 @@ Deserializer.HEADERS = function readHeadersPriority(buffer, frame) { } if (buffer.length < minFrameLength) { // Peer didn't send enough data - bad peer! + logger.error({ buffer: buffer }, 'FRAME_SIZE_ERROR, readHeadersPriority, buffer.length: ' + + buffer.length + ' minFrameLength: ' + minFrameLength); return 'FRAME_SIZE_ERROR'; } @@ -410,6 +414,8 @@ Deserializer.HEADERS = function readHeadersPriority(buffer, frame) { if (paddingLength) { if ((buffer.length - dataOffset) < paddingLength) { // Not enough data left to satisfy the advertised padding - bad peer! + logger.error({ buffer: buffer }, 'FRAME_SIZE_ERROR, readHeadersPriority, buffer.length: ' + buffer.length + + ' dataOffset: ' + dataOffset + ' paddingLength: ' + paddingLength); return 'FRAME_SIZE_ERROR'; } frame.data = buffer.slice(dataOffset, -1 * paddingLength); @@ -454,9 +460,10 @@ Serializer.PRIORITY = function writePriority(frame, buffers) { buffers.push(buffer); }; -Deserializer.PRIORITY = function readPriority(buffer, frame) { +Deserializer.PRIORITY = function readPriority(buffer, frame, role, logger) { if (buffer.length < 5) { // PRIORITY frames are 5 bytes long. Bad peer! + logger.error({ buffer: buffer }, 'FRAME_SIZE_ERROR, readPriority, buffer.length: ' + buffer.length); return 'FRAME_SIZE_ERROR'; } var dependencyData = new Buffer(4); @@ -497,9 +504,10 @@ Serializer.RST_STREAM = function writeRstStream(frame, buffers) { buffers.push(buffer); }; -Deserializer.RST_STREAM = function readRstStream(buffer, frame) { +Deserializer.RST_STREAM = function readRstStream(buffer, frame, role, logger) { if (buffer.length < 4) { // RST_STREAM is 4 bytes long. Bad peer! + logger.error({ buffer: buffer }, 'FRAME_SIZE_ERROR, readRstStream, buffer.length: ' + buffer.length); return 'FRAME_SIZE_ERROR'; } frame.error = errorCodes[buffer.readUInt32BE(0)]; @@ -564,13 +572,14 @@ Serializer.SETTINGS = function writeSettings(frame, buffers) { buffers.push(buffer); }; -Deserializer.SETTINGS = function readSettings(buffer, frame, role) { +Deserializer.SETTINGS = function readSettings(buffer, frame, role, logger) { frame.settings = {}; // Receipt of a SETTINGS frame with the ACK flag set and a length // field value other than 0 MUST be treated as a connection error // (Section 5.4.1) of type FRAME_SIZE_ERROR. if(frame.flags.ACK && buffer.length != 0) { + logger.error({ buffer: buffer }, 'FRAME_SIZE_ERROR, readSettings, buffer.length: ' + buffer.length); return 'FRAME_SIZE_ERROR'; } @@ -661,14 +670,16 @@ Serializer.PUSH_PROMISE = function writePushPromise(frame, buffers) { buffers.push(frame.data); }; -Deserializer.PUSH_PROMISE = function readPushPromise(buffer, frame) { +Deserializer.PUSH_PROMISE = function readPushPromise(buffer, frame, role, logger) { if (buffer.length < 4) { + logger.error({ buffer: buffer }, 'FRAME_SIZE_ERROR, readPushPromise, buffer.length: ' + buffer.length); return 'FRAME_SIZE_ERROR'; } var dataOffset = 0; var paddingLength = 0; if (frame.flags.PADDED) { if (buffer.length < 5) { + logger.error({ buffer: buffer }, 'FRAME_SIZE_ERROR, readPushPromise PADDED, buffer.length: ' + buffer.length); return 'FRAME_SIZE_ERROR'; } paddingLength = (buffer.readUInt8(dataOffset) & 0xff); @@ -678,6 +689,8 @@ Deserializer.PUSH_PROMISE = function readPushPromise(buffer, frame) { dataOffset += 4; if (paddingLength) { if ((buffer.length - dataOffset) < paddingLength) { + logger.error({ buffer: buffer }, 'FRAME_SIZE_ERROR, readPushPromise, buffer.length: ' + buffer.length + + ' dataOffset: ' + dataOffset + ' paddingLength: ' + paddingLength); return 'FRAME_SIZE_ERROR'; } frame.data = buffer.slice(dataOffset, -1 * paddingLength); @@ -709,8 +722,9 @@ Serializer.PING = function writePing(frame, buffers) { buffers.push(frame.data); }; -Deserializer.PING = function readPing(buffer, frame) { +Deserializer.PING = function readPing(buffer, frame, role, logger) { if (buffer.length !== 8) { + logger.error({ buffer: buffer }, 'FRAME_SIZE_ERROR, readPing, buffer.length: ' + buffer.length); return 'FRAME_SIZE_ERROR'; } frame.data = buffer; @@ -758,9 +772,10 @@ Serializer.GOAWAY = function writeGoaway(frame, buffers) { buffers.push(buffer); }; -Deserializer.GOAWAY = function readGoaway(buffer, frame) { - if (buffer.length !== 8) { +Deserializer.GOAWAY = function readGoaway(buffer, frame, role, logger) { + if (buffer.length < 8) { // GOAWAY must have 8 bytes + logger.error({ frame: frame }, 'FRAME_SIZE_ERROR, readGoAway, buffer.length: ' + buffer.length); return 'FRAME_SIZE_ERROR'; } frame.last_stream = buffer.readUInt32BE(0) & 0x7fffffff; @@ -769,6 +784,13 @@ Deserializer.GOAWAY = function readGoaway(buffer, frame) { // Unknown error types are to be considered equivalent to INTERNAL ERROR frame.error = 'INTERNAL_ERROR'; } + // Read remaining data into "debug_data" + // https://http2.github.io/http2-spec/#GOAWAY + // Endpoints MAY append opaque data to the payload of any GOAWAY frame + if (buffer.length > 8) { + logger.error({ frame: frame }, 'readGoAway debug_data, buffer.length: ' + buffer.length); + frame.debug_data = buffer.slice(8); + } }; // [WINDOW_UPDATE](https://tools.ietf.org/html/rfc7540#section-6.9) @@ -799,8 +821,9 @@ Serializer.WINDOW_UPDATE = function writeWindowUpdate(frame, buffers) { buffers.push(buffer); }; -Deserializer.WINDOW_UPDATE = function readWindowUpdate(buffer, frame) { +Deserializer.WINDOW_UPDATE = function readWindowUpdate(buffer, frame, role, logger) { if (buffer.length !== WINDOW_UPDATE_PAYLOAD_SIZE) { + logger.error({ frame: frame }, 'FRAME_SIZE_ERROR, readWindowUpdate, buffer.length: ' + buffer.length); return 'FRAME_SIZE_ERROR'; } frame.window_size = buffer.readUInt32BE(0) & 0x7fffffff; @@ -1034,12 +1057,14 @@ function unescape(s) { return t; } -Deserializer.ALTSVC = function readAltSvc(buffer, frame) { +Deserializer.ALTSVC = function readAltSvc(buffer, frame, role, logger) { if (buffer.length < 2) { + logger.error({ buffer: buffer }, 'FRAME_SIZE_ERROR, readAltSvc, buffer.length: ' + buffer.length); return 'FRAME_SIZE_ERROR'; } var originLength = buffer.readUInt16BE(0); if ((buffer.length - 2) < originLength) { + logger.error({ buffer: buffer }, 'FRAME_SIZE_ERROR, readAltSvc 2, buffer.length: ' + buffer.length); return 'FRAME_SIZE_ERROR'; } frame.origin = buffer.toString('ascii', 2, 2 + originLength); diff --git a/lib/protocol/stream.js b/lib/protocol/stream.js index 6d520b9..8396354 100644 --- a/lib/protocol/stream.js +++ b/lib/protocol/stream.js @@ -313,6 +313,7 @@ Stream.prototype._write = function _write(buffer, encoding, ready) { // * Call ready when upstream is ready to receive more frames. if (moreNeeded) { + this._log.error('Stream._write moreNeeded'); ready(); } else { this._sendMore = ready;