From 8c440447642324a0e40175b0455e8ca32335db65 Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Thu, 7 Nov 2019 09:16:42 -0500 Subject: [PATCH] feat: add `MessageStream` for streamed wire protocol messaging This is a duplex stream that can read and write MongoDB wire protocol messages, with optional compression. This bring in a dependency on the `bl` module. --- lib/core/cmap/message_stream.js | 177 +++++++++++++++++++++++++ package.json | 1 + test/unit/cmap/message_stream_tests.js | 120 +++++++++++++++++ 3 files changed, 298 insertions(+) create mode 100644 lib/core/cmap/message_stream.js create mode 100644 test/unit/cmap/message_stream_tests.js diff --git a/lib/core/cmap/message_stream.js b/lib/core/cmap/message_stream.js new file mode 100644 index 0000000000..00550ded0b --- /dev/null +++ b/lib/core/cmap/message_stream.js @@ -0,0 +1,177 @@ +'use strict'; + +const Duplex = require('stream').Duplex; +const BufferList = require('bl'); +const MongoParseError = require('../error').MongoParseError; +const decompress = require('../wireprotocol/compression').decompress; +const Response = require('../connection/commands').Response; +const BinMsg = require('../connection/msg').BinMsg; +const MongoError = require('../error').MongoError; +const OP_COMPRESSED = require('../wireprotocol/shared').opcodes.OP_COMPRESSED; +const OP_MSG = require('../wireprotocol/shared').opcodes.OP_MSG; +const MESSAGE_HEADER_SIZE = require('../wireprotocol/shared').MESSAGE_HEADER_SIZE; +const COMPRESSION_DETAILS_SIZE = require('../wireprotocol/shared').COMPRESSION_DETAILS_SIZE; +const opcodes = require('../wireprotocol/shared').opcodes; +const compress = require('../wireprotocol/compression').compress; +const compressorIDs = require('../wireprotocol/compression').compressorIDs; +const uncompressibleCommands = require('../wireprotocol/compression').uncompressibleCommands; +const Msg = require('../connection/msg').Msg; + +const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4; +const kBuffer = Symbol('buffer'); + +/** + * A duplex stream that is capable of reading and writing raw wire protocol messages, with + * support for optional compression + */ +class MessageStream extends Duplex { + constructor(options) { + options = options || {}; + super(options); + + this.bson = options.bson; + this.maxBsonMessageSize = options.maxBsonMessageSize || kDefaultMaxBsonMessageSize; + + this[kBuffer] = new BufferList(); + } + + _write(chunk, _, callback) { + this[kBuffer].append(chunk); + + while (this[kBuffer].length >= 4) { + const sizeOfMessage = this[kBuffer].readInt32LE(0); + if (sizeOfMessage < 0) { + callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}`)); + return; + } + + if (sizeOfMessage > this.maxBsonMessageSize) { + callback( + new MongoParseError( + `Invalid message size: ${sizeOfMessage}, max allowed: ${this.maxBsonMessageSize}` + ) + ); + return; + } + + if (sizeOfMessage > this[kBuffer].length) { + callback(); + return; + } + + const messageBuffer = this[kBuffer].slice(0, sizeOfMessage); + processMessage(this, messageBuffer, callback); + this[kBuffer].consume(sizeOfMessage); + } + } + + _read(/* size */) { + // NOTE: This implementation is empty because we explicitly push data to be read + // when `writeMessage` is called. + return; + } + + writeCommand(command, options, callback) { + // TODO: agreed compressor should live in `StreamDescription` + const shouldCompress = options && !!options.agreedCompressor; + if (!shouldCompress || !canCompress(command)) { + this.push(Buffer.concat(command.toBin())); + return; + } + + // otherwise, compress the message + const concatenatedOriginalCommandBuffer = Buffer.concat(command.toBind()); + const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE); + + // Extract information needed for OP_COMPRESSED from the uncompressed message + const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12); + + // Compress the message body + compress({ options }, messageToBeCompressed, (err, compressedMessage) => { + if (err) { + callback(err, null); + return; + } + + // Create the msgHeader of OP_COMPRESSED + const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE); + msgHeader.writeInt32LE( + MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length, + 0 + ); // messageLength + msgHeader.writeInt32LE(command.requestId, 4); // requestID + msgHeader.writeInt32LE(0, 8); // responseTo (zero) + msgHeader.writeInt32LE(opcodes.OP_COMPRESSED, 12); // opCode + + // Create the compression details of OP_COMPRESSED + const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE); + compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode + compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader + compressionDetails.writeUInt8(compressorIDs[options.agreedCompressor], 8); // compressorID + + this.push(Buffer.concat([msgHeader, compressionDetails, compressedMessage])); + }); + } +} + +// Return whether a command contains an uncompressible command term +// Will return true if command contains no uncompressible command terms +function canCompress(command) { + const commandDoc = command instanceof Msg ? command.command : command.query; + const commandName = Object.keys(commandDoc)[0]; + return uncompressibleCommands.indexOf(commandName) === -1; +} + +function processMessage(stream, message, callback) { + const messageHeader = { + messageLength: message.readInt32LE(0), + requestId: message.readInt32LE(4), + responseTo: message.readInt32LE(8), + opCode: message.readInt32LE(12) + }; + + const ResponseType = messageHeader.opCode === OP_MSG ? BinMsg : Response; + const responseOptions = stream.responseOptions; + if (messageHeader.opCode !== OP_COMPRESSED) { + const messageBody = message.slice(MESSAGE_HEADER_SIZE); + stream.emit( + 'message', + new ResponseType(stream.bson, message, messageHeader, messageBody, responseOptions) + ); + + callback(); + return; + } + + messageHeader.fromCompressed = true; + messageHeader.opCode = message.readInt32LE(MESSAGE_HEADER_SIZE); + messageHeader.length = message.readInt32LE(MESSAGE_HEADER_SIZE + 4); + const compressorID = message[MESSAGE_HEADER_SIZE + 8]; + const compressedBuffer = message.slice(MESSAGE_HEADER_SIZE + 9); + + decompress(compressorID, compressedBuffer, (err, messageBody) => { + if (err) { + callback(err); + return; + } + + if (messageBody.length !== messageHeader.length) { + callback( + new MongoError( + 'Decompressing a compressed message from the server failed. The message is corrupt.' + ) + ); + + return; + } + + stream.emit( + 'message', + new ResponseType(stream.bson, message, messageHeader, messageBody, responseOptions) + ); + + callback(); + }); +} + +module.exports = MessageStream; diff --git a/package.json b/package.json index 47a560ddbf..693607c789 100644 --- a/package.json +++ b/package.json @@ -24,6 +24,7 @@ "bson-ext": "^2.0.0" }, "dependencies": { + "bl": "^4.0.0", "bson": "^1.1.1", "require_optional": "^1.0.1", "safe-buffer": "^5.1.2" diff --git a/test/unit/cmap/message_stream_tests.js b/test/unit/cmap/message_stream_tests.js new file mode 100644 index 0000000000..bb5d6128f7 --- /dev/null +++ b/test/unit/cmap/message_stream_tests.js @@ -0,0 +1,120 @@ +'use strict'; +const BSON = require('bson'); +const Readable = require('stream').Readable; +const Writable = require('stream').Writable; +const MessageStream = require('../../../lib/core/cmap/message_stream'); +const Msg = require('../../../lib/core/connection/msg').Msg; +const expect = require('chai').expect; + +function bufferToStream(buffer) { + const stream = new Readable(); + stream.push(buffer); + stream.push(null); + return stream; +} + +function streamToBuffer(stream) { + return new Promise((resolve, reject) => { + let buffers = []; + stream.on('error', reject); + stream.on('data', data => buffers.push(data)); + stream.on('end', () => resolve(Buffer.concat(buffers))); + }); +} + +describe('Message Stream', function() { + describe('reading', function() { + [ + { + description: 'valid OP_REPLY', + data: Buffer.from( + '370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000', + 'hex' + ), + documents: [{ ismaster: 1 }] + }, + { + description: 'valid OP_MSG', + data: Buffer.from( + '370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000', + 'hex' + ), + documents: [{ $db: 'admin', ismaster: 1 }] + }, + { + description: 'Invalid message size (negative)', + data: Buffer.from('ffffffff', 'hex'), + error: 'Invalid message size: -1' + }, + { + description: 'Invalid message size (exceeds maximum)', + data: Buffer.from('01000004', 'hex'), + error: 'Invalid message size: 67108865, max allowed: 67108864' + } + ].forEach(test => { + it(test.description, function(done) { + const bson = new BSON(); + const error = test.error; + const inputStream = bufferToStream(test.data); + const messageStream = new MessageStream({ bson }); + + messageStream.on('message', msg => { + if (error) { + done(new Error(`expected error: ${error}`)); + return; + } + + msg.parse(); + + if (test.documents) { + expect(msg).to.have.property('documents'); + expect(msg.documents).to.eql(test.documents); + } + + done(); + }); + + messageStream.on('error', err => { + if (error == null) { + done(err); + return; + } + + expect(err.message).to.equal(error); + done(); + }); + + inputStream.pipe(messageStream); + }); + }); + }); + + describe('writing', function() { + it('should write a message to the stream', function(done) { + const readableStream = new Readable({ read() {} }); + const writeableStream = new Writable({ + write: (chunk, _, callback) => { + readableStream.push(chunk); + callback(); + } + }); + + readableStream.on('data', data => { + expect(data.toString('hex')).to.eql( + '370000000300000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000' + ); + + done(); + }); + + const bson = new BSON(); + const messageStream = new MessageStream({ bson }); + messageStream.pipe(writeableStream); + + const command = new Msg(bson, 'admin.$cmd', { ismaster: 1 }, {}); + messageStream.writeMessage(command, null, err => { + done(err); + }); + }); + }); +});