Skip to content

Commit

Permalink
refactor: move find command building all into the find operation (#2643)
Browse files Browse the repository at this point in the history
One of the changes made in merging the core and native drivers was
to merge the wire protocol methods for executing find operations.
A vestigial piece of this was that the find command was being
built in the wire protocol layer, rather than the operation layer.
This patch teases these two apart, so that the `query` wire
protocol method only creates and executes an `OP_QUERY` message,
and the modern and legacy find commands are built in the find
operation's definition itself.

NODE-2900
  • Loading branch information
mbroadst authored Dec 2, 2020
1 parent 577d6eb commit 58e4252
Show file tree
Hide file tree
Showing 8 changed files with 292 additions and 353 deletions.
13 changes: 10 additions & 3 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@ import { StreamDescription, StreamDescriptionOptions } from './stream_descriptio
import * as wp from './wire_protocol';
import { CommandStartedEvent, CommandFailedEvent, CommandSucceededEvent } from './events';
import { updateSessionFromResponse } from '../sessions';
import { uuidV4, ClientMetadata, now, calculateDurationInMs, Callback } from '../utils';
import {
uuidV4,
ClientMetadata,
now,
calculateDurationInMs,
Callback,
MongoDBNamespace
} from '../utils';
import {
MongoError,
MongoNetworkError,
Expand All @@ -23,7 +30,7 @@ import type { GetMoreOptions } from './wire_protocol/get_more';
import type { InsertOptions, UpdateOptions, RemoveOptions } from './wire_protocol/index';
import type { Stream } from './connect';
import type { LoggerOptions } from '../logger';
import type { FindOptions } from '../operations/find';
import type { QueryOptions } from './wire_protocol/query';

const kStream = Symbol('stream');
const kQueue = Symbol('queue');
Expand Down Expand Up @@ -246,7 +253,7 @@ export class Connection extends EventEmitter {
wp.command(makeServerTrampoline(this), ns, cmd, options as CommandOptions, callback);
}

query(ns: string, cmd: Document, options: FindOptions, callback: Callback): void {
query(ns: MongoDBNamespace, cmd: Document, options: QueryOptions, callback: Callback): void {
wp.query(makeServerTrampoline(this), ns, cmd, options, callback);
}

Expand Down
276 changes: 66 additions & 210 deletions src/cmap/wire_protocol/query.ts
Original file line number Diff line number Diff line change
@@ -1,175 +1,40 @@
import { command, CommandOptions } from './command';
import { Query } from '../commands';
import { MongoError } from '../../error';
import { maxWireVersion, collectionNamespace, Callback, decorateWithExplain } from '../../utils';
import { getReadPreference, isSharded, applyCommonQueryOptions } from './shared';
import { Document, pluckBSONSerializeOptions } from '../../bson';
import { OpQueryOptions, Query } from '../commands';
import type { Callback, MongoDBNamespace } from '../../utils';
import { BSONSerializeOptions, Document, pluckBSONSerializeOptions } from '../../bson';
import type { Server } from '../../sdam/server';
import type { ReadPreferenceLike } from '../../read_preference';
import type { FindOptions } from '../../operations/find';
import { Explain } from '../../explain';
import { ReadPreference } from '../../read_preference';

/** @internal */
export interface QueryOptions extends CommandOptions {
readPreference?: ReadPreferenceLike;
export interface QueryOptions extends BSONSerializeOptions {
readPreference: ReadPreference;
documentsReturnedIn?: string;
batchSize?: number;
limit?: number;
skip?: number;
projection?: Document;
tailable?: boolean;
awaitData?: boolean;
noCursorTimeout?: boolean;
/** @deprecated use `noCursorTimeout` instead */
timeout?: boolean;
partial?: boolean;
oplogReplay?: boolean;
}

export function query(
server: Server,
ns: string,
cmd: Document,
options: FindOptions,
ns: MongoDBNamespace,
findCommand: Document,
options: QueryOptions,
callback: Callback
): void {
options = options || {};

if (cmd == null) {
return callback(new MongoError(`command ${JSON.stringify(cmd)} does not return a cursor`));
}

if (maxWireVersion(server) < 4) {
const query = prepareLegacyFindQuery(server, ns, cmd, options);
const queryOptions = applyCommonQueryOptions(
{},
Object.assign(options, { ...pluckBSONSerializeOptions(options) })
);

queryOptions.fullResult = true;
if (typeof query.documentsReturnedIn === 'string') {
queryOptions.documentsReturnedIn = query.documentsReturnedIn;
}

server.s.pool.write(query, queryOptions, callback);
return;
}

const readPreference = getReadPreference(cmd, options);
let findCmd = prepareFindCommand(server, ns, cmd);

// If we have explain, we need to rewrite the find command
// to wrap it in the explain command
const explain = Explain.fromOptions(options);
if (explain) {
findCmd = decorateWithExplain(findCmd, explain);
}

// NOTE: This actually modifies the passed in cmd, and our code _depends_ on this
// side-effect. Change this ASAP
cmd.virtual = false;

const commandOptions = Object.assign(
{
documentsReturnedIn: 'firstBatch',
numberToReturn: 1,
slaveOk: readPreference.slaveOk()
},
options
);

command(server, ns, findCmd, commandOptions, callback);
}

function prepareFindCommand(server: Server, ns: string, cmd: Document) {
const findCmd: Document = {
find: collectionNamespace(ns)
};

if (cmd.query) {
if (cmd.query['$query']) {
findCmd.filter = cmd.query['$query'];
} else {
findCmd.filter = cmd.query;
}
}

let sortValue = cmd.sort;
if (Array.isArray(sortValue)) {
const sortObject: Document = {};

if (sortValue.length > 0 && !Array.isArray(sortValue[0])) {
let sortDirection = sortValue[1];
if (sortDirection === 'asc') {
sortDirection = 1;
} else if (sortDirection === 'desc') {
sortDirection = -1;
}

sortObject[sortValue[0]] = sortDirection;
} else {
for (let i = 0; i < sortValue.length; i++) {
let sortDirection = sortValue[i][1];
if (sortDirection === 'asc') {
sortDirection = 1;
} else if (sortDirection === 'desc') {
sortDirection = -1;
}

sortObject[sortValue[i][0]] = sortDirection;
}
}

sortValue = sortObject;
}

if (typeof cmd.allowDiskUse === 'boolean') {
findCmd.allowDiskUse = cmd.allowDiskUse;
}

if (cmd.sort) findCmd.sort = sortValue;
if (cmd.fields) findCmd.projection = cmd.fields;
if (cmd.hint) findCmd.hint = cmd.hint;
if (cmd.skip) findCmd.skip = cmd.skip;
if (cmd.limit) findCmd.limit = cmd.limit;
if (cmd.limit < 0) {
findCmd.limit = Math.abs(cmd.limit);
findCmd.singleBatch = true;
}

if (typeof cmd.batchSize === 'number') {
if (cmd.batchSize < 0) {
if (cmd.limit !== 0 && Math.abs(cmd.batchSize) < Math.abs(cmd.limit)) {
findCmd.limit = Math.abs(cmd.batchSize);
}

findCmd.singleBatch = true;
}

findCmd.batchSize = Math.abs(cmd.batchSize);
}

if (cmd.comment) findCmd.comment = cmd.comment;
if (cmd.maxScan) findCmd.maxScan = cmd.maxScan;
if (cmd.maxTimeMS) findCmd.maxTimeMS = cmd.maxTimeMS;
if (cmd.min) findCmd.min = cmd.min;
if (cmd.max) findCmd.max = cmd.max;
findCmd.returnKey = cmd.returnKey ? cmd.returnKey : false;
findCmd.showRecordId = cmd.showDiskLoc ? cmd.showDiskLoc : false;
if (cmd.snapshot) findCmd.snapshot = cmd.snapshot;
if (cmd.tailable) findCmd.tailable = cmd.tailable;
if (cmd.oplogReplay) findCmd.oplogReplay = cmd.oplogReplay;
if (cmd.noCursorTimeout) findCmd.noCursorTimeout = cmd.noCursorTimeout;
if (cmd.awaitData) findCmd.awaitData = cmd.awaitData;
if (cmd.awaitdata) findCmd.awaitData = cmd.awaitdata;
if (cmd.partial) findCmd.partial = cmd.partial;
if (cmd.collation) findCmd.collation = cmd.collation;
if (cmd.readConcern) findCmd.readConcern = cmd.readConcern;

return findCmd;
}

function prepareLegacyFindQuery(
server: Server,
ns: string,
cmd: Document,
options: FindOptions
): Query {
options = options || {};

const readPreference = getReadPreference(cmd, options);
const batchSize = cmd.batchSize || options.batchSize || 0;
const limit = cmd.limit || options.limit;
const numberToSkip = cmd.skip || options.skip || 0;

const isExplain = typeof findCommand.$explain !== 'undefined';
const readPreference = options.readPreference ?? ReadPreference.primary;
const batchSize = options.batchSize || 0;
const limit = options.limit;
const numberToSkip = options.skip || 0;
let numberToReturn = 0;
if (
limit &&
Expand All @@ -180,66 +45,57 @@ function prepareLegacyFindQuery(
numberToReturn = batchSize;
}

const findCmd: Document = {};
if (isSharded(server) && readPreference) {
findCmd['$readPreference'] = readPreference.toJSON();
if (isExplain) {
// nToReturn must be 0 (match all) or negative (match N and close cursor)
// nToReturn > 0 will give explain results equivalent to limit(0)
numberToReturn = -Math.abs(limit || 0);
}

if (cmd.sort) findCmd['$orderby'] = cmd.sort;
if (cmd.hint) findCmd['$hint'] = cmd.hint;
if (cmd.snapshot) findCmd['$snapshot'] = cmd.snapshot;
if (typeof cmd.returnKey !== 'undefined') findCmd['$returnKey'] = cmd.returnKey;
if (cmd.maxScan) findCmd['$maxScan'] = cmd.maxScan;
if (cmd.min) findCmd['$min'] = cmd.min;
if (cmd.max) findCmd['$max'] = cmd.max;
if (typeof cmd.showDiskLoc !== 'undefined') {
findCmd['$showDiskLoc'] = cmd.showDiskLoc;
} else if (typeof cmd.showRecordId !== 'undefined') {
findCmd['$showDiskLoc'] = cmd.showRecordId;
const queryOptions: OpQueryOptions = {
numberToSkip,
numberToReturn,
pre32Limit: typeof limit === 'number' ? limit : undefined,
checkKeys: false,
slaveOk: readPreference.slaveOk()
};

if (options.projection) {
queryOptions.returnFieldSelector = options.projection;
}

if (cmd.comment) findCmd['$comment'] = cmd.comment;
if (cmd.maxTimeMS) findCmd['$maxTimeMS'] = cmd.maxTimeMS;
if (options.explain) {
// nToReturn must be 0 (match all) or negative (match N and close cursor)
// nToReturn > 0 will give explain results equivalent to limit(0)
numberToReturn = -Math.abs(cmd.limit || 0);
findCmd['$explain'] = true;
const query = new Query(ns.toString(), findCommand, queryOptions);
if (typeof options.tailable === 'boolean') {
query.tailable = options.tailable;
}

findCmd['$query'] = cmd.query;
if (cmd.readConcern && cmd.readConcern.level !== 'local') {
throw new MongoError(
`server find command does not support a readConcern level of ${cmd.readConcern.level}`
);
if (typeof options.oplogReplay === 'boolean') {
query.oplogReplay = options.oplogReplay;
}

if (cmd.readConcern) {
cmd = Object.assign({}, cmd);
delete cmd['readConcern'];
if (typeof options.timeout === 'boolean') {
query.noCursorTimeout = options.timeout;
} else if (typeof options.noCursorTimeout === 'boolean') {
query.noCursorTimeout = options.noCursorTimeout;
}

const serializeFunctions =
typeof options.serializeFunctions === 'boolean' ? options.serializeFunctions : false;
const ignoreUndefined =
typeof options.ignoreUndefined === 'boolean' ? options.ignoreUndefined : false;
if (typeof options.awaitData === 'boolean') {
query.awaitData = options.awaitData;
}

const query = new Query(ns, findCmd, {
numberToSkip,
numberToReturn,
pre32Limit: typeof limit === 'number' ? limit : undefined,
checkKeys: false,
returnFieldSelector: cmd.fields,
serializeFunctions,
ignoreUndefined
});
if (typeof options.partial === 'boolean') {
query.partial = options.partial;
}

if (typeof cmd.tailable === 'boolean') query.tailable = cmd.tailable;
if (typeof cmd.oplogReplay === 'boolean') query.oplogReplay = cmd.oplogReplay;
if (typeof cmd.noCursorTimeout === 'boolean') query.noCursorTimeout = cmd.noCursorTimeout;
if (typeof cmd.awaitData === 'boolean') query.awaitData = cmd.awaitData;
if (typeof cmd.partial === 'boolean') query.partial = cmd.partial;
server.s.pool.write(
query,
{ fullResult: true, ...pluckBSONSerializeOptions(options) },
(err, result) => {
if (err || !result) return callback(err, result);
if (isExplain && result.documents && result.documents[0]) {
return callback(undefined, result.documents[0]);
}

query.slaveOk = readPreference.slaveOk();
return query;
callback(undefined, result);
}
);
}
1 change: 0 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ export type {
UpdateOptions as WireUpdateOptions,
RemoveOptions as WireRemoveOptions
} from './cmap/wire_protocol/index';
export type { QueryOptions } from './cmap/wire_protocol/query';
export type { CollationOptions, WriteCommandOptions } from './cmap/wire_protocol/write_command';
export type { CollectionPrivate, CollectionOptions } from './collection';
export type { AggregationCursorOptions } from './cursor/aggregation_cursor';
Expand Down
Loading

0 comments on commit 58e4252

Please sign in to comment.