Skip to content

Commit

Permalink
AssertionError: false == true in lib/protocol/connection.js
Browse files Browse the repository at this point in the history
  • Loading branch information
devinterx committed Jun 20, 2017
1 parent 2c4c310 commit b0b8d7a
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 23 deletions.
36 changes: 28 additions & 8 deletions lib/protocol/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -214,15 +214,18 @@ Connection.prototype._insert = function _insert(stream, priority) {
};

Connection.prototype._reprioritize = function _reprioritize(stream, priority) {
this._removePrioritisedStream(stream);
this._insert(stream, priority);
};

Connection.prototype._removePrioritisedStream = function _removePrioritisedStream(stream) {
var bucket = this._streamPriorities[stream._priority];
var index = bucket.indexOf(stream);
assert(index !== -1);
bucket.splice(index, 1);
if (bucket.length === 0) {
delete this._streamPriorities[stream._priority];
}

this._insert(stream, priority);
};

// Creating an *inbound* stream with the given ID. It is called when there's an incoming frame to
Expand All @@ -246,9 +249,17 @@ Connection.prototype.createStream = function createStream() {
var stream = new Stream(this._log, this);
this._allocatePriority(stream);

stream.on('end', this._removeStream.bind(this, stream));

return stream;
};

Connection.prototype._removeStream = function _removeStream(stream) {
this._log.trace('Removing outbound stream.');
delete this._streamIds[stream.id];
this._removePrioritisedStream(stream);
};

// Multiplexing
// ------------

Expand Down Expand Up @@ -290,7 +301,7 @@ priority_loop:
// 2. if there's no frame, skip this stream
// 3. if forwarding this frame would make `streamCount` greater than `streamLimit`, skip
// this stream
// 4. adding stream to the bucket of the next round
// 4. adding stream to the bucket of the next round unless it has ended
// 5. assigning an ID to the frame (allocating an ID to the stream if there isn't already)
// 6. if forwarding a PUSH_PROMISE, allocate ID to the promised stream
// 7. forwarding the frame, changing `streamCount` as appropriate
Expand All @@ -299,6 +310,7 @@ priority_loop:
while (bucket.length > 0) {
for (var index = 0; index < bucket.length; index++) {
var stream = bucket[index];
if(!stream || !stream.upstream) continue;
var frame = stream.upstream.read((this._window > 0) ? this._window : -1);

if (!frame) {
Expand All @@ -307,8 +319,11 @@ priority_loop:
stream.upstream.unshift(frame);
continue;
}

nextBucket.push(stream);
if (!stream._ended) {
nextBucket.push(stream);
} else {
delete this._streamIds[stream.id];
}

if (frame.stream === undefined) {
frame.stream = stream.id || this._allocateId(stream);
Expand All @@ -323,8 +338,12 @@ priority_loop:
var moreNeeded = this.push(frame);
this._changeStreamCount(frame.count_change);

assert(moreNeeded !== null); // The frame shouldn't be unforwarded
if (moreNeeded === false) {
//assert(moreNeeded !== null); // The frame shouldn't be unforwarded
if (moreNeeded === null) {
this._log.error('The frame shouldn\'t be unforwarded');
}

if (!moreNeeded) {
break priority_loop;
}
}
Expand Down Expand Up @@ -577,7 +596,8 @@ Connection.prototype.close = function close(error) {
};

Connection.prototype._receiveGoaway = function _receiveGoaway(frame) {
this._log.debug({ error: frame.error }, 'Other end closed the connection');
// this._log.debug({ error: frame.error }, 'Other end closed the connection');
this._log.error({ frame: frame }, '_receiveGoaway, Other end closed the connection');
this.push(null);
this._closed = true;
if (frame.error !== 'NO_ERROR') {
Expand Down
55 changes: 40 additions & 15 deletions lib/protocol/framer.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ Deserializer.prototype._transform = function _transform(chunk, encoding, done) {
// If it's header then the parsed data is stored in a temporary variable and then the
// deserializer waits for the specified length payload.
if ((this._cursor === this._buffer.length) && this._waitingForHeader) {
var payloadSize = Deserializer.commonHeader(this._buffer, this._frame);
var payloadSize = Deserializer.commonHeader(this._buffer, this._frame, this._log);
if (payloadSize <= MAX_PAYLOAD_SIZE) {
this._next(payloadSize);
} else {
this.emit('error', 'FRAME_SIZE_ERROR');
this._log.error({ payloadSize: payloadSize, max: MAX_PAYLOAD_SIZE }, 'FRAME_SIZE_ERROR, _transform');
return;
}
}
Expand All @@ -127,7 +127,7 @@ Deserializer.prototype._transform = function _transform(chunk, encoding, done) {
// will also run.
if ((this._cursor === this._buffer.length) && !this._waitingForHeader) {
if (this._frame.type) {
var error = Deserializer[this._frame.type](this._buffer, this._frame, this._role);
var error = Deserializer[this._frame.type](this._buffer, this._frame, this._role, this._log);
if (error) {
this._log.error('Incoming frame parsing error: ' + error);
this.emit('error', error);
Expand Down Expand Up @@ -234,8 +234,9 @@ Serializer.commonHeader = function writeCommonHeader(frame, buffers) {
return size;
};

Deserializer.commonHeader = function readCommonHeader(buffer, frame) {
Deserializer.commonHeader = function readCommonHeader(buffer, frame, logger) {
if (buffer.length < 9) {
logger.error({ buffer: buffer }, 'FRAME_SIZE_ERROR, readCommonHeader, buffer.length: ' + buffer.length);
return 'FRAME_SIZE_ERROR';
}

Expand Down Expand Up @@ -297,12 +298,13 @@ Serializer.DATA = function writeData(frame, buffers) {
buffers.push(frame.data);
};

Deserializer.DATA = function readData(buffer, frame) {
Deserializer.DATA = function readData(buffer, frame, role, logger) {
var dataOffset = 0;
var paddingLength = 0;
if (frame.flags.PADDED) {
if (buffer.length < 1) {
// We must have at least one byte for padding control, but we don't. Bad peer!
logger.error({ buffer: buffer }, 'FRAME_SIZE_ERROR, readData, buffer.length: ' + buffer.length);
return 'FRAME_SIZE_ERROR';
}
paddingLength = (buffer.readUInt8(dataOffset) & 0xff);
Expand Down Expand Up @@ -376,7 +378,7 @@ Serializer.HEADERS = function writeHeadersPriority(frame, buffers) {
buffers.push(frame.data);
};

Deserializer.HEADERS = function readHeadersPriority(buffer, frame) {
Deserializer.HEADERS = function readHeadersPriority(buffer, frame, role, logger) {
var minFrameLength = 0;
if (frame.flags.PADDED) {
minFrameLength += 1;
Expand All @@ -386,6 +388,8 @@ Deserializer.HEADERS = function readHeadersPriority(buffer, frame) {
}
if (buffer.length < minFrameLength) {
// Peer didn't send enough data - bad peer!
logger.error({ buffer: buffer }, 'FRAME_SIZE_ERROR, readHeadersPriority, buffer.length: '
+ buffer.length + ' minFrameLength: ' + minFrameLength);
return 'FRAME_SIZE_ERROR';
}

Expand All @@ -410,6 +414,8 @@ Deserializer.HEADERS = function readHeadersPriority(buffer, frame) {
if (paddingLength) {
if ((buffer.length - dataOffset) < paddingLength) {
// Not enough data left to satisfy the advertised padding - bad peer!
logger.error({ buffer: buffer }, 'FRAME_SIZE_ERROR, readHeadersPriority, buffer.length: ' + buffer.length
+ ' dataOffset: ' + dataOffset + ' paddingLength: ' + paddingLength);
return 'FRAME_SIZE_ERROR';
}
frame.data = buffer.slice(dataOffset, -1 * paddingLength);
Expand Down Expand Up @@ -454,9 +460,10 @@ Serializer.PRIORITY = function writePriority(frame, buffers) {
buffers.push(buffer);
};

Deserializer.PRIORITY = function readPriority(buffer, frame) {
Deserializer.PRIORITY = function readPriority(buffer, frame, role, logger) {
if (buffer.length < 5) {
// PRIORITY frames are 5 bytes long. Bad peer!
logger.error({ buffer: buffer }, 'FRAME_SIZE_ERROR, readPriority, buffer.length: ' + buffer.length);
return 'FRAME_SIZE_ERROR';
}
var dependencyData = new Buffer(4);
Expand Down Expand Up @@ -497,9 +504,10 @@ Serializer.RST_STREAM = function writeRstStream(frame, buffers) {
buffers.push(buffer);
};

Deserializer.RST_STREAM = function readRstStream(buffer, frame) {
Deserializer.RST_STREAM = function readRstStream(buffer, frame, role, logger) {
if (buffer.length < 4) {
// RST_STREAM is 4 bytes long. Bad peer!
logger.error({ buffer: buffer }, 'FRAME_SIZE_ERROR, readRstStream, buffer.length: ' + buffer.length);
return 'FRAME_SIZE_ERROR';
}
frame.error = errorCodes[buffer.readUInt32BE(0)];
Expand Down Expand Up @@ -564,13 +572,14 @@ Serializer.SETTINGS = function writeSettings(frame, buffers) {
buffers.push(buffer);
};

Deserializer.SETTINGS = function readSettings(buffer, frame, role) {
Deserializer.SETTINGS = function readSettings(buffer, frame, role, logger) {
frame.settings = {};

// Receipt of a SETTINGS frame with the ACK flag set and a length
// field value other than 0 MUST be treated as a connection error
// (Section 5.4.1) of type FRAME_SIZE_ERROR.
if(frame.flags.ACK && buffer.length != 0) {
logger.error({ buffer: buffer }, 'FRAME_SIZE_ERROR, readSettings, buffer.length: ' + buffer.length);
return 'FRAME_SIZE_ERROR';
}

Expand Down Expand Up @@ -661,14 +670,16 @@ Serializer.PUSH_PROMISE = function writePushPromise(frame, buffers) {
buffers.push(frame.data);
};

Deserializer.PUSH_PROMISE = function readPushPromise(buffer, frame) {
Deserializer.PUSH_PROMISE = function readPushPromise(buffer, frame, role, logger) {
if (buffer.length < 4) {
logger.error({ buffer: buffer }, 'FRAME_SIZE_ERROR, readPushPromise, buffer.length: ' + buffer.length);
return 'FRAME_SIZE_ERROR';
}
var dataOffset = 0;
var paddingLength = 0;
if (frame.flags.PADDED) {
if (buffer.length < 5) {
logger.error({ buffer: buffer }, 'FRAME_SIZE_ERROR, readPushPromise PADDED, buffer.length: ' + buffer.length);
return 'FRAME_SIZE_ERROR';
}
paddingLength = (buffer.readUInt8(dataOffset) & 0xff);
Expand All @@ -678,6 +689,8 @@ Deserializer.PUSH_PROMISE = function readPushPromise(buffer, frame) {
dataOffset += 4;
if (paddingLength) {
if ((buffer.length - dataOffset) < paddingLength) {
logger.error({ buffer: buffer }, 'FRAME_SIZE_ERROR, readPushPromise, buffer.length: ' + buffer.length
+ ' dataOffset: ' + dataOffset + ' paddingLength: ' + paddingLength);
return 'FRAME_SIZE_ERROR';
}
frame.data = buffer.slice(dataOffset, -1 * paddingLength);
Expand Down Expand Up @@ -709,8 +722,9 @@ Serializer.PING = function writePing(frame, buffers) {
buffers.push(frame.data);
};

Deserializer.PING = function readPing(buffer, frame) {
Deserializer.PING = function readPing(buffer, frame, role, logger) {
if (buffer.length !== 8) {
logger.error({ buffer: buffer }, 'FRAME_SIZE_ERROR, readPing, buffer.length: ' + buffer.length);
return 'FRAME_SIZE_ERROR';
}
frame.data = buffer;
Expand Down Expand Up @@ -758,9 +772,10 @@ Serializer.GOAWAY = function writeGoaway(frame, buffers) {
buffers.push(buffer);
};

Deserializer.GOAWAY = function readGoaway(buffer, frame) {
if (buffer.length !== 8) {
Deserializer.GOAWAY = function readGoaway(buffer, frame, role, logger) {
if (buffer.length < 8) {
// GOAWAY must have 8 bytes
logger.error({ frame: frame }, 'FRAME_SIZE_ERROR, readGoAway, buffer.length: ' + buffer.length);
return 'FRAME_SIZE_ERROR';
}
frame.last_stream = buffer.readUInt32BE(0) & 0x7fffffff;
Expand All @@ -769,6 +784,13 @@ Deserializer.GOAWAY = function readGoaway(buffer, frame) {
// Unknown error types are to be considered equivalent to INTERNAL ERROR
frame.error = 'INTERNAL_ERROR';
}
// Read remaining data into "debug_data"
// https://http2.github.io/http2-spec/#GOAWAY
// Endpoints MAY append opaque data to the payload of any GOAWAY frame
if (buffer.length > 8) {
logger.error({ frame: frame }, 'readGoAway debug_data, buffer.length: ' + buffer.length);
frame.debug_data = buffer.slice(8);
}
};

// [WINDOW_UPDATE](https://tools.ietf.org/html/rfc7540#section-6.9)
Expand Down Expand Up @@ -799,8 +821,9 @@ Serializer.WINDOW_UPDATE = function writeWindowUpdate(frame, buffers) {
buffers.push(buffer);
};

Deserializer.WINDOW_UPDATE = function readWindowUpdate(buffer, frame) {
Deserializer.WINDOW_UPDATE = function readWindowUpdate(buffer, frame, role, logger) {
if (buffer.length !== WINDOW_UPDATE_PAYLOAD_SIZE) {
logger.error({ frame: frame }, 'FRAME_SIZE_ERROR, readWindowUpdate, buffer.length: ' + buffer.length);
return 'FRAME_SIZE_ERROR';
}
frame.window_size = buffer.readUInt32BE(0) & 0x7fffffff;
Expand Down Expand Up @@ -1034,12 +1057,14 @@ function unescape(s) {
return t;
}

Deserializer.ALTSVC = function readAltSvc(buffer, frame) {
Deserializer.ALTSVC = function readAltSvc(buffer, frame, role, logger) {
if (buffer.length < 2) {
logger.error({ buffer: buffer }, 'FRAME_SIZE_ERROR, readAltSvc, buffer.length: ' + buffer.length);
return 'FRAME_SIZE_ERROR';
}
var originLength = buffer.readUInt16BE(0);
if ((buffer.length - 2) < originLength) {
logger.error({ buffer: buffer }, 'FRAME_SIZE_ERROR, readAltSvc 2, buffer.length: ' + buffer.length);
return 'FRAME_SIZE_ERROR';
}
frame.origin = buffer.toString('ascii', 2, 2 + originLength);
Expand Down
1 change: 1 addition & 0 deletions lib/protocol/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ Stream.prototype._write = function _write(buffer, encoding, ready) {

// * Call ready when upstream is ready to receive more frames.
if (moreNeeded) {
this._log.error('Stream._write moreNeeded');
ready();
} else {
this._sendMore = ready;
Expand Down

0 comments on commit b0b8d7a

Please sign in to comment.