Skip to content

Commit

Permalink
fix(NODE-3074): update estimated document count for v1 api (#2764)
Browse files Browse the repository at this point in the history
The estimatedDocumentCount operation now uses an aggregate command
with a $collStats stage against 5.0+ servers.
  • Loading branch information
emadum authored Apr 7, 2021
1 parent d477e2e commit 146791c
Show file tree
Hide file tree
Showing 39 changed files with 2,602 additions and 403 deletions.
3 changes: 1 addition & 2 deletions src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,6 @@ export class Collection {
callback?: Callback<number>
): Promise<number> | void {
if (typeof options === 'function') (callback = options), (options = {});

return executeOperation(
getTopology(this),
new EstimatedDocumentCountOperation(this, resolveOptions(this, options)),
Expand Down Expand Up @@ -1430,7 +1429,7 @@ export class Collection {
query = query || {};
return executeOperation(
getTopology(this),
new EstimatedDocumentCountOperation(this, query, resolveOptions(this, options)),
new CountDocumentsOperation(this, query, resolveOptions(this, options)),
callback
);
}
Expand Down
64 changes: 31 additions & 33 deletions src/operations/estimated_document_count.ts
Original file line number Diff line number Diff line change
@@ -1,62 +1,60 @@
import { Aspect, defineAspects, Hint } from './operation';
import { Aspect, defineAspects } from './operation';
import { CommandOperation, CommandOperationOptions } from './command';
import type { Callback } from '../utils';
import { Callback, maxWireVersion } from '../utils';
import type { Document } from '../bson';
import type { Server } from '../sdam/server';
import type { Collection } from '../collection';
import type { ClientSession } from '../sessions';
import type { MongoError } from '../error';

/** @public */
export interface EstimatedDocumentCountOptions extends CommandOperationOptions {
skip?: number;
limit?: number;
hint?: Hint;
/**
* The maximum amount of time to allow the operation to run.
*
* This option is sent only if the caller explicitly provides a value. The default is to not send a value.
*/
maxTimeMS?: number;
}

/** @internal */
export class EstimatedDocumentCountOperation extends CommandOperation<number> {
options: EstimatedDocumentCountOptions;
collectionName: string;
query?: Document;

constructor(collection: Collection, options: EstimatedDocumentCountOptions);
constructor(collection: Collection, query: Document, options: EstimatedDocumentCountOptions);
constructor(
collection: Collection,
query?: Document | EstimatedDocumentCountOptions,
options?: EstimatedDocumentCountOptions
) {
if (typeof options === 'undefined') {
options = query as EstimatedDocumentCountOptions;
query = undefined;
}

constructor(collection: Collection, options: EstimatedDocumentCountOptions = {}) {
super(collection, options);
this.options = options;
this.collectionName = collection.collectionName;
if (query) {
this.query = query;
}
}

execute(server: Server, session: ClientSession, callback: Callback<number>): void {
const options = this.options;
const cmd: Document = { count: this.collectionName };

if (this.query) {
cmd.query = this.query;
if (maxWireVersion(server) < 12) {
return this.executeLegacy(server, session, callback);
}
const pipeline = [{ $collStats: { count: {} } }, { $group: { _id: 1, n: { $sum: '$count' } } }];

if (typeof options.skip === 'number') {
cmd.skip = options.skip;
}
const cmd: Document = { aggregate: this.collectionName, pipeline, cursor: {} };

if (typeof options.limit === 'number') {
cmd.limit = options.limit;
if (typeof this.options.maxTimeMS === 'number') {
cmd.maxTimeMS = this.options.maxTimeMS;
}

if (options.hint) {
cmd.hint = options.hint;
super.executeCommand(server, session, cmd, (err, response) => {
if (err && (err as MongoError).code !== 26) {
callback(err);
return;
}

callback(undefined, response?.cursor?.firstBatch[0]?.n || 0);
});
}

executeLegacy(server: Server, session: ClientSession, callback: Callback<number>): void {
const cmd: Document = { count: this.collectionName };

if (typeof this.options.maxTimeMS === 'number') {
cmd.maxTimeMS = this.options.maxTimeMS;
}

super.executeCommand(server, session, cmd, (err, response) => {
Expand Down
1 change: 1 addition & 0 deletions test/functional/apm.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,7 @@ describe('APM', function () {
}

loadSpecTests('apm').forEach(scenario => {
if (scenario.name === 'command') return; // FIXME(NODE-3074): remove when `count` spec tests have been fixed
describe(scenario.name, function () {
scenario.tests.forEach(test => {
const requirements = { topology: ['single', 'replicaset', 'sharded'] };
Expand Down
19 changes: 19 additions & 0 deletions test/functional/crud_spec.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ const TestRunnerContext = require('./spec-runner').TestRunnerContext;
const gatherTestSuites = require('./spec-runner').gatherTestSuites;
const generateTopologyTests = require('./spec-runner').generateTopologyTests;

const { loadSpecTests } = require('../spec/index');
const { runUnifiedTest } = require('./unified-spec-runner/runner');

function enforceServerVersionLimits(requires, scenario) {
const versionLimits = [];
if (scenario.minServerVersion) {
Expand Down Expand Up @@ -433,3 +436,19 @@ describe('CRUD v2', function () {

generateTopologyTests(testSuites, testContext);
});

describe('CRUD unified', function () {
for (const crudSpecTest of loadSpecTests('crud/unified')) {
expect(crudSpecTest).to.exist;
context(String(crudSpecTest.description), function () {
for (const test of crudSpecTest.tests) {
it(String(test.description), {
metadata: { sessions: { skipLeakTests: true } },
test: async function () {
await runUnifiedTest(this, crudSpecTest, test);
}
});
}
});
}
});
2 changes: 1 addition & 1 deletion test/functional/cursor.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3594,7 +3594,7 @@ describe('Cursor', function () {
}
});

it('Correctly decorate the collection cursor count command with skip, limit, hint, readConcern', {
it.skip('Correctly decorate the collection count command with skip, limit, hint, readConcern', {
// Add a tag that our runner can trigger on
// in this case we are setting that node needs to be higher than 0.10.X to run
metadata: {
Expand Down
3 changes: 2 additions & 1 deletion test/functional/retryable_reads.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ describe('Retryable Reads', function () {
spec.description.match(/listCollections/i) ||
spec.description.match(/listCollectionNames/i) ||
spec.description.match(/estimatedDocumentCount/i) ||
spec.description.match(/count/i) ||
// FIXME(NODE-3074): uncomment when `count` spec tests have been fixed
// spec.description.match(/count/i) ||
spec.description.match(/find/i)
);
});
Expand Down
3 changes: 2 additions & 1 deletion test/functional/transactions.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ describe('Transactions', function () {
'commitTransaction retry succeeds on new mongos',
'commitTransaction retry fails on new mongos',
'unpin after transient error within a transaction and commit',
// 'count',
// FIXME(NODE-3074): unskip count tests when spec tests have been updated
'count',
// This test needs there to be multiple mongoses
// 'increment txnNumber',
// Skipping this until SPEC-1320 is resolved
Expand Down
21 changes: 14 additions & 7 deletions test/functional/unified-spec-runner/entities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ interface UnifiedChangeStream extends ChangeStream {

export type CommandEvent = CommandStartedEvent | CommandSucceededEvent | CommandFailedEvent;

function serverApiConfig() {
if (process.env.MONGODB_API_VERSION) {
return { version: process.env.MONGODB_API_VERSION };
}
}

function getClient(address) {
const serverApi = serverApiConfig();
return new MongoClient(`mongodb://${address}`, serverApi ? { serverApi } : {});
}

export class UnifiedMongoClient extends MongoClient {
events: CommandEvent[];
failPoints: Document[];
Expand All @@ -43,11 +54,7 @@ export class UnifiedMongoClient extends MongoClient {
super(url, {
monitorCommands: true,
...description.uriOptions,
serverApi: description.serverApi
? description.serverApi
: process.env.MONGODB_API_VERSION
? { version: process.env.MONGODB_API_VERSION }
: null
serverApi: description.serverApi ? description.serverApi : serverApiConfig()
});
this.events = [];
this.failPoints = [];
Expand Down Expand Up @@ -93,7 +100,7 @@ export class FailPointMap extends Map<string, Document> {
} else {
// create a new client
address = addressOrClient.toString();
client = new MongoClient(`mongodb://${address}`);
client = getClient(address);
await client.connect();
}

Expand All @@ -114,7 +121,7 @@ export class FailPointMap extends Map<string, Document> {
const entries = Array.from(this.entries());
await Promise.all(
entries.map(async ([hostAddress, configureFailPoint]) => {
const client = new MongoClient(`mongodb://${hostAddress}`);
const client = getClient(hostAddress);
await client.connect();
const admin = client.db('admin');
const result = await admin.command({ configureFailPoint, mode: 'off' });
Expand Down
4 changes: 2 additions & 2 deletions test/functional/unified-spec-runner/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { ReadPreference } from '../../../src/read_preference';
import { WriteConcern } from '../../../src/write_concern';
import { Document, InsertOneOptions } from '../../../src';
import { EventCollector } from '../../tools/utils';
import { EntitiesMap, UnifiedMongoClient } from './entities';
import { EntitiesMap } from './entities';
import { expectErrorCheck, resultCheck } from './match';
import type { OperationDescription } from './schema';
import { CommandStartedEvent } from '../../../src/cmap/command_monitoring_events';
Expand Down Expand Up @@ -384,7 +384,7 @@ operations.set('distinct', async ({ entities, operation }) => {

operations.set('estimatedDocumentCount', async ({ entities, operation }) => {
const collection = entities.getEntity('collection', operation.object);
return collection.estimatedDocumentCount();
return collection.estimatedDocumentCount(operation.arguments);
});

operations.set('findOneAndDelete', async ({ entities, operation }) => {
Expand Down
12 changes: 1 addition & 11 deletions test/functional/versioned-api.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,7 @@ describe('Versioned API', function () {
it(String(test.description), {
metadata: { sessions: { skipLeakTests: true } },
test: async function () {
try {
await runUnifiedTest(this, versionedApiTest, test);
} catch (error) {
if (error.message.includes('not implemented.')) {
console.log(`${test.description}: was skipped due to missing functionality`);
console.log(error.stack);
this.skip();
} else {
throw error;
}
}
await runUnifiedTest(this, versionedApiTest, test);
}
});
}
Expand Down
Loading

0 comments on commit 146791c

Please sign in to comment.