-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add
MessageStream
for streamed wire protocol messaging
This is a duplex stream that can read and write MongoDB wire protocol messages, with optional compression. This bring in a dependency on the `bl` module.
- Loading branch information
Showing
3 changed files
with
298 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
'use strict'; | ||
|
||
const Duplex = require('stream').Duplex; | ||
const BufferList = require('bl'); | ||
const MongoParseError = require('../error').MongoParseError; | ||
const decompress = require('../wireprotocol/compression').decompress; | ||
const Response = require('../connection/commands').Response; | ||
const BinMsg = require('../connection/msg').BinMsg; | ||
const MongoError = require('../error').MongoError; | ||
const OP_COMPRESSED = require('../wireprotocol/shared').opcodes.OP_COMPRESSED; | ||
const OP_MSG = require('../wireprotocol/shared').opcodes.OP_MSG; | ||
const MESSAGE_HEADER_SIZE = require('../wireprotocol/shared').MESSAGE_HEADER_SIZE; | ||
const COMPRESSION_DETAILS_SIZE = require('../wireprotocol/shared').COMPRESSION_DETAILS_SIZE; | ||
const opcodes = require('../wireprotocol/shared').opcodes; | ||
const compress = require('../wireprotocol/compression').compress; | ||
const compressorIDs = require('../wireprotocol/compression').compressorIDs; | ||
const uncompressibleCommands = require('../wireprotocol/compression').uncompressibleCommands; | ||
const Msg = require('../connection/msg').Msg; | ||
|
||
const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4; | ||
const kBuffer = Symbol('buffer'); | ||
|
||
/** | ||
* A duplex stream that is capable of reading and writing raw wire protocol messages, with | ||
* support for optional compression | ||
*/ | ||
class MessageStream extends Duplex { | ||
constructor(options) { | ||
options = options || {}; | ||
super(options); | ||
|
||
this.bson = options.bson; | ||
this.maxBsonMessageSize = options.maxBsonMessageSize || kDefaultMaxBsonMessageSize; | ||
|
||
this[kBuffer] = new BufferList(); | ||
} | ||
|
||
_write(chunk, _, callback) { | ||
this[kBuffer].append(chunk); | ||
|
||
while (this[kBuffer].length >= 4) { | ||
const sizeOfMessage = this[kBuffer].readInt32LE(0); | ||
if (sizeOfMessage < 0) { | ||
callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}`)); | ||
return; | ||
} | ||
|
||
if (sizeOfMessage > this.maxBsonMessageSize) { | ||
callback( | ||
new MongoParseError( | ||
`Invalid message size: ${sizeOfMessage}, max allowed: ${this.maxBsonMessageSize}` | ||
) | ||
); | ||
return; | ||
} | ||
|
||
if (sizeOfMessage > this[kBuffer].length) { | ||
callback(); | ||
return; | ||
} | ||
|
||
const messageBuffer = this[kBuffer].slice(0, sizeOfMessage); | ||
processMessage(this, messageBuffer, callback); | ||
this[kBuffer].consume(sizeOfMessage); | ||
} | ||
} | ||
|
||
_read(/* size */) { | ||
// NOTE: This implementation is empty because we explicitly push data to be read | ||
// when `writeMessage` is called. | ||
return; | ||
} | ||
|
||
writeCommand(command, options, callback) { | ||
// TODO: agreed compressor should live in `StreamDescription` | ||
const shouldCompress = options && !!options.agreedCompressor; | ||
if (!shouldCompress || !canCompress(command)) { | ||
this.push(Buffer.concat(command.toBin())); | ||
return; | ||
} | ||
|
||
// otherwise, compress the message | ||
const concatenatedOriginalCommandBuffer = Buffer.concat(command.toBind()); | ||
const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE); | ||
|
||
// Extract information needed for OP_COMPRESSED from the uncompressed message | ||
const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12); | ||
|
||
// Compress the message body | ||
compress({ options }, messageToBeCompressed, (err, compressedMessage) => { | ||
if (err) { | ||
callback(err, null); | ||
return; | ||
} | ||
|
||
// Create the msgHeader of OP_COMPRESSED | ||
const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE); | ||
msgHeader.writeInt32LE( | ||
MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length, | ||
0 | ||
); // messageLength | ||
msgHeader.writeInt32LE(command.requestId, 4); // requestID | ||
msgHeader.writeInt32LE(0, 8); // responseTo (zero) | ||
msgHeader.writeInt32LE(opcodes.OP_COMPRESSED, 12); // opCode | ||
|
||
// Create the compression details of OP_COMPRESSED | ||
const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE); | ||
compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode | ||
compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader | ||
compressionDetails.writeUInt8(compressorIDs[options.agreedCompressor], 8); // compressorID | ||
|
||
this.push(Buffer.concat([msgHeader, compressionDetails, compressedMessage])); | ||
}); | ||
} | ||
} | ||
|
||
// Return whether a command contains an uncompressible command term | ||
// Will return true if command contains no uncompressible command terms | ||
function canCompress(command) { | ||
const commandDoc = command instanceof Msg ? command.command : command.query; | ||
const commandName = Object.keys(commandDoc)[0]; | ||
return uncompressibleCommands.indexOf(commandName) === -1; | ||
} | ||
|
||
function processMessage(stream, message, callback) { | ||
const messageHeader = { | ||
messageLength: message.readInt32LE(0), | ||
requestId: message.readInt32LE(4), | ||
responseTo: message.readInt32LE(8), | ||
opCode: message.readInt32LE(12) | ||
}; | ||
|
||
const ResponseType = messageHeader.opCode === OP_MSG ? BinMsg : Response; | ||
const responseOptions = stream.responseOptions; | ||
if (messageHeader.opCode !== OP_COMPRESSED) { | ||
const messageBody = message.slice(MESSAGE_HEADER_SIZE); | ||
stream.emit( | ||
'message', | ||
new ResponseType(stream.bson, message, messageHeader, messageBody, responseOptions) | ||
); | ||
|
||
callback(); | ||
return; | ||
} | ||
|
||
messageHeader.fromCompressed = true; | ||
messageHeader.opCode = message.readInt32LE(MESSAGE_HEADER_SIZE); | ||
messageHeader.length = message.readInt32LE(MESSAGE_HEADER_SIZE + 4); | ||
const compressorID = message[MESSAGE_HEADER_SIZE + 8]; | ||
const compressedBuffer = message.slice(MESSAGE_HEADER_SIZE + 9); | ||
|
||
decompress(compressorID, compressedBuffer, (err, messageBody) => { | ||
if (err) { | ||
callback(err); | ||
return; | ||
} | ||
|
||
if (messageBody.length !== messageHeader.length) { | ||
callback( | ||
new MongoError( | ||
'Decompressing a compressed message from the server failed. The message is corrupt.' | ||
) | ||
); | ||
|
||
return; | ||
} | ||
|
||
stream.emit( | ||
'message', | ||
new ResponseType(stream.bson, message, messageHeader, messageBody, responseOptions) | ||
); | ||
|
||
callback(); | ||
}); | ||
} | ||
|
||
module.exports = MessageStream; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
'use strict'; | ||
const BSON = require('bson'); | ||
const Readable = require('stream').Readable; | ||
const Writable = require('stream').Writable; | ||
const MessageStream = require('../../../lib/core/cmap/message_stream'); | ||
const Msg = require('../../../lib/core/connection/msg').Msg; | ||
const expect = require('chai').expect; | ||
|
||
function bufferToStream(buffer) { | ||
const stream = new Readable(); | ||
stream.push(buffer); | ||
stream.push(null); | ||
return stream; | ||
} | ||
|
||
function streamToBuffer(stream) { | ||
return new Promise((resolve, reject) => { | ||
let buffers = []; | ||
stream.on('error', reject); | ||
stream.on('data', data => buffers.push(data)); | ||
stream.on('end', () => resolve(Buffer.concat(buffers))); | ||
}); | ||
} | ||
|
||
describe('Message Stream', function() { | ||
describe('reading', function() { | ||
[ | ||
{ | ||
description: 'valid OP_REPLY', | ||
data: Buffer.from( | ||
'370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000', | ||
'hex' | ||
), | ||
documents: [{ ismaster: 1 }] | ||
}, | ||
{ | ||
description: 'valid OP_MSG', | ||
data: Buffer.from( | ||
'370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000', | ||
'hex' | ||
), | ||
documents: [{ $db: 'admin', ismaster: 1 }] | ||
}, | ||
{ | ||
description: 'Invalid message size (negative)', | ||
data: Buffer.from('ffffffff', 'hex'), | ||
error: 'Invalid message size: -1' | ||
}, | ||
{ | ||
description: 'Invalid message size (exceeds maximum)', | ||
data: Buffer.from('01000004', 'hex'), | ||
error: 'Invalid message size: 67108865, max allowed: 67108864' | ||
} | ||
].forEach(test => { | ||
it(test.description, function(done) { | ||
const bson = new BSON(); | ||
const error = test.error; | ||
const inputStream = bufferToStream(test.data); | ||
const messageStream = new MessageStream({ bson }); | ||
|
||
messageStream.on('message', msg => { | ||
if (error) { | ||
done(new Error(`expected error: ${error}`)); | ||
return; | ||
} | ||
|
||
msg.parse(); | ||
|
||
if (test.documents) { | ||
expect(msg).to.have.property('documents'); | ||
expect(msg.documents).to.eql(test.documents); | ||
} | ||
|
||
done(); | ||
}); | ||
|
||
messageStream.on('error', err => { | ||
if (error == null) { | ||
done(err); | ||
return; | ||
} | ||
|
||
expect(err.message).to.equal(error); | ||
done(); | ||
}); | ||
|
||
inputStream.pipe(messageStream); | ||
}); | ||
}); | ||
}); | ||
|
||
describe('writing', function() { | ||
it('should write a message to the stream', function(done) { | ||
const readableStream = new Readable({ read() {} }); | ||
const writeableStream = new Writable({ | ||
write: (chunk, _, callback) => { | ||
readableStream.push(chunk); | ||
callback(); | ||
} | ||
}); | ||
|
||
readableStream.on('data', data => { | ||
expect(data.toString('hex')).to.eql( | ||
'370000000300000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000' | ||
); | ||
|
||
done(); | ||
}); | ||
|
||
const bson = new BSON(); | ||
const messageStream = new MessageStream({ bson }); | ||
messageStream.pipe(writeableStream); | ||
|
||
const command = new Msg(bson, 'admin.$cmd', { ismaster: 1 }, {}); | ||
messageStream.writeMessage(command, null, err => { | ||
done(err); | ||
}); | ||
}); | ||
}); | ||
}); |