Skip to content

Commit

Permalink
perf(NODE-5771): improve new connection (#3948)
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken authored Dec 19, 2023
1 parent 735f7aa commit a4776cf
Show file tree
Hide file tree
Showing 7 changed files with 258 additions and 163 deletions.
201 changes: 84 additions & 117 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { on } from 'stream';
import { type Readable, Transform, type TransformCallback } from 'stream';
import { clearTimeout, setTimeout } from 'timers';
import { promisify } from 'util';

Expand Down Expand Up @@ -61,6 +61,7 @@ import type { ClientMetadata } from './handshake/client_metadata';
import { MessageStream, type OperationDescription } from './message_stream';
import { StreamDescription, type StreamDescriptionOptions } from './stream_description';
import { decompressResponse } from './wire_protocol/compression';
import { onData } from './wire_protocol/on_data';
import { getReadPreference, isSharded } from './wire_protocol/shared';

/** @internal */
Expand Down Expand Up @@ -807,17 +808,19 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
/** @internal */
authContext?: AuthContext;

/**@internal */
delayedTimeoutId: NodeJS.Timeout | null = null;
/** @internal */
[kDescription]: StreamDescription;
/** @internal */
[kGeneration]: number;
/** @internal */
[kLastUseTime]: number;
/** @internal */
socket: Stream;
controller: AbortController;

private socket: Stream;
private controller: AbortController;
private messageStream: Readable;
private socketWrite: (buffer: Uint8Array) => Promise<void>;

/** @internal */
[kHello]: Document | null;
/** @internal */
Expand Down Expand Up @@ -857,9 +860,18 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {

this.socket = stream;
this.controller = new AbortController();
this.socket.on('error', this.onError.bind(this));

this.messageStream = this.socket
.on('error', this.onError.bind(this))
.pipe(new SizedMessageTransform({ connection: this }))
.on('error', this.onError.bind(this));
this.socket.on('close', this.onClose.bind(this));
this.socket.on('timeout', this.onTimeout.bind(this));

const socketWrite = promisify(this.socket.write.bind(this.socket));
this.socketWrite = async buffer => {
return abortable(socketWrite(buffer), { signal: this.controller.signal });
};
}

async commandAsync(...args: Parameters<typeof this.command>) {
Expand Down Expand Up @@ -1060,23 +1072,19 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
}

try {
await writeCommand(this, message, {
await this.writeCommand(message, {
agreedCompressor: this.description.compressor ?? 'none',
zlibCompressionLevel: this.description.zlibCompressionLevel,
signal: this.controller.signal
zlibCompressionLevel: this.description.zlibCompressionLevel
});

// TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners
this.controller = new AbortController();

if (options.noResponse) {
yield { ok: 1 };
return;
}

this.controller.signal.throwIfAborted();

for await (const response of readMany(this, { signal: this.controller.signal })) {
for await (const response of this.readMany()) {
this.socket.setTimeout(0);
response.parse(options);

Expand All @@ -1094,9 +1102,6 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
}
}

// TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners
this.controller = new AbortController();

yield document;
this.controller.signal.throwIfAborted();

Expand Down Expand Up @@ -1214,121 +1219,83 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
};
exhaustLoop().catch(replyListener);
}
}

const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4;

/**
* @internal
*
* This helper reads chucks of data out of a socket and buffers them until it has received a
* full wire protocol message.
*
* By itself, produces an infinite async generator of wire protocol messages and consumers must end
* the stream by calling `return` on the generator.
*
* Note that `for-await` loops call `return` automatically when the loop is exited.
*/
export async function* readWireProtocolMessages(
connection: ModernConnection,
{ signal }: { signal?: AbortSignal } = {}
): AsyncGenerator<Buffer> {
const bufferPool = new BufferPool();
const maxBsonMessageSize = connection.hello?.maxBsonMessageSize ?? kDefaultMaxBsonMessageSize;
for await (const [chunk] of on(connection.socket, 'data', { signal })) {
if (connection.delayedTimeoutId) {
clearTimeout(connection.delayedTimeoutId);
connection.delayedTimeoutId = null;
}

bufferPool.append(chunk);
const sizeOfMessage = bufferPool.getInt32();
/**
* @internal
*
* Writes an OP_MSG or OP_QUERY request to the socket, optionally compressing the command. This method
* waits until the socket's buffer has emptied (the Nodejs socket `drain` event has fired).
*/
async writeCommand(
command: WriteProtocolMessageType,
options: Partial<Pick<OperationDescription, 'agreedCompressor' | 'zlibCompressionLevel'>>
): Promise<void> {
const finalCommand =
options.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command)
? command
: new OpCompressedRequest(command, {
agreedCompressor: options.agreedCompressor ?? 'none',
zlibCompressionLevel: options.zlibCompressionLevel ?? 0
});

if (sizeOfMessage == null) {
continue;
}
const buffer = Buffer.concat(await finalCommand.toBin());

if (sizeOfMessage < 0) {
throw new MongoParseError(`Invalid message size: ${sizeOfMessage}`);
}
return this.socketWrite(buffer);
}

if (sizeOfMessage > maxBsonMessageSize) {
throw new MongoParseError(
`Invalid message size: ${sizeOfMessage}, max allowed: ${maxBsonMessageSize}`
);
}
/**
* @internal
*
* Returns an async generator that yields full wire protocol messages from the underlying socket. This function
* yields messages until `moreToCome` is false or not present in a response, or the caller cancels the request
* by calling `return` on the generator.
*
* Note that `for-await` loops call `return` automatically when the loop is exited.
*/
async *readMany(): AsyncGenerator<OpMsgResponse | OpQueryResponse> {
for await (const message of onData(this.messageStream, { signal: this.controller.signal })) {
const response = await decompressResponse(message);
yield response;

if (sizeOfMessage > bufferPool.length) {
continue;
if (!response.moreToCome) {
return;
}
}

yield bufferPool.read(sizeOfMessage);
}
}

/**
* @internal
*
* Writes an OP_MSG or OP_QUERY request to the socket, optionally compressing the command. This method
* waits until the socket's buffer has emptied (the Nodejs socket `drain` event has fired).
*/
export async function writeCommand(
connection: ModernConnection,
command: WriteProtocolMessageType,
options: Partial<Pick<OperationDescription, 'agreedCompressor' | 'zlibCompressionLevel'>> & {
signal?: AbortSignal;
}
): Promise<void> {
const finalCommand =
options.agreedCompressor === 'none' || !OpCompressedRequest.canCompress(command)
? command
: new OpCompressedRequest(command, {
agreedCompressor: options.agreedCompressor ?? 'none',
zlibCompressionLevel: options.zlibCompressionLevel ?? 0
});
/** @internal */
export class SizedMessageTransform extends Transform {
bufferPool: BufferPool;
connection: ModernConnection;

constructor({ connection }: { connection: ModernConnection }) {
super({ objectMode: false });
this.bufferPool = new BufferPool();
this.connection = connection;
}
override _transform(chunk: Buffer, encoding: unknown, callback: TransformCallback): void {
if (this.connection.delayedTimeoutId != null) {
clearTimeout(this.connection.delayedTimeoutId);
this.connection.delayedTimeoutId = null;
}

const buffer = Buffer.concat(await finalCommand.toBin());
this.bufferPool.append(chunk);
const sizeOfMessage = this.bufferPool.getInt32();

const socketWriteFn = promisify(connection.socket.write.bind(connection.socket));
if (sizeOfMessage == null) {
return callback();
}

return abortable(socketWriteFn(buffer), options);
}
if (sizeOfMessage < 0) {
return callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}, too small`));
}

/**
* @internal
*
* Returns an async generator that yields full wire protocol messages from the underlying socket. This function
* yields messages until `moreToCome` is false or not present in a response, or the caller cancels the request
* by calling `return` on the generator.
*
* Note that `for-await` loops call `return` automatically when the loop is exited.
*/
export async function* readMany(
connection: ModernConnection,
options: { signal?: AbortSignal } = {}
): AsyncGenerator<OpMsgResponse | OpQueryResponse> {
for await (const message of readWireProtocolMessages(connection, options)) {
const response = await decompressResponse(message);
yield response;

if (!response.moreToCome) {
return;
if (sizeOfMessage > this.bufferPool.length) {
return callback();
}
}
}

/**
* @internal
*
* Reads a single wire protocol message out of a connection.
*/
export async function read(
connection: ModernConnection,
options: { signal?: AbortSignal } = {}
): Promise<OpMsgResponse | OpQueryResponse> {
for await (const value of readMany(connection, options)) {
return value;
const message = this.bufferPool.read(sizeOfMessage);
return callback(null, message);
}

throw new MongoRuntimeError('unable to read message off of connection');
}
Loading

0 comments on commit a4776cf

Please sign in to comment.