Skip to content

Commit

Permalink
feat: add MessageStream for streamed wire protocol messaging
Browse files Browse the repository at this point in the history
This is a duplex stream that can read and write MongoDB wire
protocol messages, with optional compression. This bring in a
dependency on the `bl` module.
  • Loading branch information
mbroadst committed Nov 12, 2019
1 parent ce60476 commit 8c44044
Show file tree
Hide file tree
Showing 3 changed files with 298 additions and 0 deletions.
177 changes: 177 additions & 0 deletions lib/core/cmap/message_stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
'use strict';

const Duplex = require('stream').Duplex;
const BufferList = require('bl');
const MongoParseError = require('../error').MongoParseError;
const decompress = require('../wireprotocol/compression').decompress;
const Response = require('../connection/commands').Response;
const BinMsg = require('../connection/msg').BinMsg;
const MongoError = require('../error').MongoError;
const OP_COMPRESSED = require('../wireprotocol/shared').opcodes.OP_COMPRESSED;
const OP_MSG = require('../wireprotocol/shared').opcodes.OP_MSG;
const MESSAGE_HEADER_SIZE = require('../wireprotocol/shared').MESSAGE_HEADER_SIZE;
const COMPRESSION_DETAILS_SIZE = require('../wireprotocol/shared').COMPRESSION_DETAILS_SIZE;
const opcodes = require('../wireprotocol/shared').opcodes;
const compress = require('../wireprotocol/compression').compress;
const compressorIDs = require('../wireprotocol/compression').compressorIDs;
const uncompressibleCommands = require('../wireprotocol/compression').uncompressibleCommands;
const Msg = require('../connection/msg').Msg;

const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4;
const kBuffer = Symbol('buffer');

/**
* A duplex stream that is capable of reading and writing raw wire protocol messages, with
* support for optional compression
*/
class MessageStream extends Duplex {
constructor(options) {
options = options || {};
super(options);

this.bson = options.bson;
this.maxBsonMessageSize = options.maxBsonMessageSize || kDefaultMaxBsonMessageSize;

this[kBuffer] = new BufferList();
}

_write(chunk, _, callback) {
this[kBuffer].append(chunk);

while (this[kBuffer].length >= 4) {
const sizeOfMessage = this[kBuffer].readInt32LE(0);
if (sizeOfMessage < 0) {
callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}`));
return;
}

if (sizeOfMessage > this.maxBsonMessageSize) {
callback(
new MongoParseError(
`Invalid message size: ${sizeOfMessage}, max allowed: ${this.maxBsonMessageSize}`
)
);
return;
}

if (sizeOfMessage > this[kBuffer].length) {
callback();
return;
}

const messageBuffer = this[kBuffer].slice(0, sizeOfMessage);
processMessage(this, messageBuffer, callback);
this[kBuffer].consume(sizeOfMessage);
}
}

_read(/* size */) {
// NOTE: This implementation is empty because we explicitly push data to be read
// when `writeMessage` is called.
return;
}

writeCommand(command, options, callback) {
// TODO: agreed compressor should live in `StreamDescription`
const shouldCompress = options && !!options.agreedCompressor;
if (!shouldCompress || !canCompress(command)) {
this.push(Buffer.concat(command.toBin()));
return;
}

// otherwise, compress the message
const concatenatedOriginalCommandBuffer = Buffer.concat(command.toBind());
const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE);

// Extract information needed for OP_COMPRESSED from the uncompressed message
const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12);

// Compress the message body
compress({ options }, messageToBeCompressed, (err, compressedMessage) => {
if (err) {
callback(err, null);
return;
}

// Create the msgHeader of OP_COMPRESSED
const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE);
msgHeader.writeInt32LE(
MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length,
0
); // messageLength
msgHeader.writeInt32LE(command.requestId, 4); // requestID
msgHeader.writeInt32LE(0, 8); // responseTo (zero)
msgHeader.writeInt32LE(opcodes.OP_COMPRESSED, 12); // opCode

// Create the compression details of OP_COMPRESSED
const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE);
compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode
compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader
compressionDetails.writeUInt8(compressorIDs[options.agreedCompressor], 8); // compressorID

this.push(Buffer.concat([msgHeader, compressionDetails, compressedMessage]));
});
}
}

// Return whether a command contains an uncompressible command term
// Will return true if command contains no uncompressible command terms
function canCompress(command) {
const commandDoc = command instanceof Msg ? command.command : command.query;
const commandName = Object.keys(commandDoc)[0];
return uncompressibleCommands.indexOf(commandName) === -1;
}

function processMessage(stream, message, callback) {
const messageHeader = {
messageLength: message.readInt32LE(0),
requestId: message.readInt32LE(4),
responseTo: message.readInt32LE(8),
opCode: message.readInt32LE(12)
};

const ResponseType = messageHeader.opCode === OP_MSG ? BinMsg : Response;
const responseOptions = stream.responseOptions;
if (messageHeader.opCode !== OP_COMPRESSED) {
const messageBody = message.slice(MESSAGE_HEADER_SIZE);
stream.emit(
'message',
new ResponseType(stream.bson, message, messageHeader, messageBody, responseOptions)
);

callback();
return;
}

messageHeader.fromCompressed = true;
messageHeader.opCode = message.readInt32LE(MESSAGE_HEADER_SIZE);
messageHeader.length = message.readInt32LE(MESSAGE_HEADER_SIZE + 4);
const compressorID = message[MESSAGE_HEADER_SIZE + 8];
const compressedBuffer = message.slice(MESSAGE_HEADER_SIZE + 9);

decompress(compressorID, compressedBuffer, (err, messageBody) => {
if (err) {
callback(err);
return;
}

if (messageBody.length !== messageHeader.length) {
callback(
new MongoError(
'Decompressing a compressed message from the server failed. The message is corrupt.'
)
);

return;
}

stream.emit(
'message',
new ResponseType(stream.bson, message, messageHeader, messageBody, responseOptions)
);

callback();
});
}

module.exports = MessageStream;
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"bson-ext": "^2.0.0"
},
"dependencies": {
"bl": "^4.0.0",
"bson": "^1.1.1",
"require_optional": "^1.0.1",
"safe-buffer": "^5.1.2"
Expand Down
120 changes: 120 additions & 0 deletions test/unit/cmap/message_stream_tests.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
'use strict';
const BSON = require('bson');
const Readable = require('stream').Readable;
const Writable = require('stream').Writable;
const MessageStream = require('../../../lib/core/cmap/message_stream');
const Msg = require('../../../lib/core/connection/msg').Msg;
const expect = require('chai').expect;

function bufferToStream(buffer) {
const stream = new Readable();
stream.push(buffer);
stream.push(null);
return stream;
}

function streamToBuffer(stream) {
return new Promise((resolve, reject) => {
let buffers = [];
stream.on('error', reject);
stream.on('data', data => buffers.push(data));
stream.on('end', () => resolve(Buffer.concat(buffers)));
});
}

describe('Message Stream', function() {
describe('reading', function() {
[
{
description: 'valid OP_REPLY',
data: Buffer.from(
'370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000',
'hex'
),
documents: [{ ismaster: 1 }]
},
{
description: 'valid OP_MSG',
data: Buffer.from(
'370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000',
'hex'
),
documents: [{ $db: 'admin', ismaster: 1 }]
},
{
description: 'Invalid message size (negative)',
data: Buffer.from('ffffffff', 'hex'),
error: 'Invalid message size: -1'
},
{
description: 'Invalid message size (exceeds maximum)',
data: Buffer.from('01000004', 'hex'),
error: 'Invalid message size: 67108865, max allowed: 67108864'
}
].forEach(test => {
it(test.description, function(done) {
const bson = new BSON();
const error = test.error;
const inputStream = bufferToStream(test.data);
const messageStream = new MessageStream({ bson });

messageStream.on('message', msg => {
if (error) {
done(new Error(`expected error: ${error}`));
return;
}

msg.parse();

if (test.documents) {
expect(msg).to.have.property('documents');
expect(msg.documents).to.eql(test.documents);
}

done();
});

messageStream.on('error', err => {
if (error == null) {
done(err);
return;
}

expect(err.message).to.equal(error);
done();
});

inputStream.pipe(messageStream);
});
});
});

describe('writing', function() {
it('should write a message to the stream', function(done) {
const readableStream = new Readable({ read() {} });
const writeableStream = new Writable({
write: (chunk, _, callback) => {
readableStream.push(chunk);
callback();
}
});

readableStream.on('data', data => {
expect(data.toString('hex')).to.eql(
'370000000300000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000'
);

done();
});

const bson = new BSON();
const messageStream = new MessageStream({ bson });
messageStream.pipe(writeableStream);

const command = new Msg(bson, 'admin.$cmd', { ismaster: 1 }, {});
messageStream.writeMessage(command, null, err => {
done(err);
});
});
});
});

0 comments on commit 8c44044

Please sign in to comment.