Skip to content

Commit

Permalink
fix(NODE-4074): ensure getTopology doesn't throw synchronously (#3172)
Browse files Browse the repository at this point in the history
  • Loading branch information
baileympearson authored Mar 21, 2022
1 parent 799689e commit 329f081
Show file tree
Hide file tree
Showing 17 changed files with 160 additions and 90 deletions.
16 changes: 6 additions & 10 deletions src/admin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
ValidateCollectionOperation,
ValidateCollectionOptions
} from './operations/validate_collection';
import { Callback, getTopology } from './utils';
import type { Callback } from './utils';

/** @internal */
export interface AdminPrivate {
Expand Down Expand Up @@ -83,7 +83,7 @@ export class Admin {
options = Object.assign({ dbName: 'admin' }, options);

return executeOperation(
getTopology(this.s.db),
this.s.db,
new RunCommandOperation(this.s.db, command, options),
callback
);
Expand Down Expand Up @@ -207,7 +207,7 @@ export class Admin {
options = Object.assign({ dbName: 'admin' }, options);

return executeOperation(
getTopology(this.s.db),
this.s.db,
new AddUserOperation(this.s.db, username, password, options),
callback
);
Expand All @@ -233,7 +233,7 @@ export class Admin {
options = Object.assign({ dbName: 'admin' }, options);

return executeOperation(
getTopology(this.s.db),
this.s.db,
new RemoveUserOperation(this.s.db, username, options),
callback
);
Expand Down Expand Up @@ -263,7 +263,7 @@ export class Admin {
options = options ?? {};

return executeOperation(
getTopology(this.s.db),
this.s.db,
new ValidateCollectionOperation(this, collectionName, options),
callback
);
Expand All @@ -286,11 +286,7 @@ export class Admin {
if (typeof options === 'function') (callback = options), (options = {});
options = options ?? {};

return executeOperation(
getTopology(this.s.db),
new ListDatabasesOperation(this.s.db, options),
callback
);
return executeOperation(this.s.db, new ListDatabasesOperation(this.s.db, options), callback);
}

/**
Expand Down
8 changes: 4 additions & 4 deletions src/bulk/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -655,19 +655,19 @@ function executeCommands(
try {
if (isInsertBatch(batch)) {
executeOperation(
bulkOperation.s.topology,
bulkOperation.s.collection,
new InsertOperation(bulkOperation.s.namespace, batch.operations, finalOptions),
resultHandler
);
} else if (isUpdateBatch(batch)) {
executeOperation(
bulkOperation.s.topology,
bulkOperation.s.collection,
new UpdateOperation(bulkOperation.s.namespace, batch.operations, finalOptions),
resultHandler
);
} else if (isDeleteBatch(batch)) {
executeOperation(
bulkOperation.s.topology,
bulkOperation.s.collection,
new DeleteOperation(bulkOperation.s.namespace, batch.operations, finalOptions),
resultHandler
);
Expand Down Expand Up @@ -1285,7 +1285,7 @@ export abstract class BulkOperationBase {
const finalOptions = { ...this.s.options, ...options };
const operation = new BulkWriteShimOperation(this, finalOptions);

return executeOperation(this.s.topology, operation, callback);
return executeOperation(this.s.collection, operation, callback);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ export class ChangeStreamCursor<TSchema extends Document = Document> extends Abs
session
});

executeOperation(this.topology, aggregateOperation, (err, response) => {
executeOperation(session, aggregateOperation, (err, response) => {
if (err || response == null) {
return callback(err);
}
Expand Down
56 changes: 28 additions & 28 deletions src/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ export class Collection<TSchema extends Document = Document> {
}

return executeOperation(
getTopology(this),
this,
new InsertOneOperation(
this as TODO_NODE_3286,
doc,
Expand Down Expand Up @@ -338,7 +338,7 @@ export class Collection<TSchema extends Document = Document> {
options = options ? Object.assign({}, options) : { ordered: true };

return executeOperation(
getTopology(this),
this,
new InsertManyOperation(
this as TODO_NODE_3286,
docs,
Expand Down Expand Up @@ -406,7 +406,7 @@ export class Collection<TSchema extends Document = Document> {
}

return executeOperation(
getTopology(this),
this,
new BulkWriteOperation(
this as TODO_NODE_3286,
operations as TODO_NODE_3286,
Expand Down Expand Up @@ -453,7 +453,7 @@ export class Collection<TSchema extends Document = Document> {
if (typeof options === 'function') (callback = options), (options = {});

return executeOperation(
getTopology(this),
this,
new UpdateOneOperation(
this as TODO_NODE_3286,
filter,
Expand Down Expand Up @@ -501,7 +501,7 @@ export class Collection<TSchema extends Document = Document> {
if (typeof options === 'function') (callback = options), (options = {});

return executeOperation(
getTopology(this),
this,
new ReplaceOneOperation(
this as TODO_NODE_3286,
filter,
Expand Down Expand Up @@ -549,7 +549,7 @@ export class Collection<TSchema extends Document = Document> {
if (typeof options === 'function') (callback = options), (options = {});

return executeOperation(
getTopology(this),
this,
new UpdateManyOperation(
this as TODO_NODE_3286,
filter,
Expand Down Expand Up @@ -583,7 +583,7 @@ export class Collection<TSchema extends Document = Document> {
if (typeof options === 'function') (callback = options), (options = {});

return executeOperation(
getTopology(this),
this,
new DeleteOneOperation(this as TODO_NODE_3286, filter, resolveOptions(this, options)),
callback
);
Expand Down Expand Up @@ -623,7 +623,7 @@ export class Collection<TSchema extends Document = Document> {
}

return executeOperation(
getTopology(this),
this,
new DeleteManyOperation(this as TODO_NODE_3286, filter, resolveOptions(this, options)),
callback
);
Expand Down Expand Up @@ -652,7 +652,7 @@ export class Collection<TSchema extends Document = Document> {

// Intentionally, we do not inherit options from parent for this operation.
return executeOperation(
getTopology(this),
this,
new RenameOperation(this as TODO_NODE_3286, newName, {
...options,
readPreference: ReadPreference.PRIMARY
Expand All @@ -679,7 +679,7 @@ export class Collection<TSchema extends Document = Document> {
options = options ?? {};

return executeOperation(
getTopology(this),
this,
new DropCollectionOperation(this.s.db, this.collectionName, options),
callback
);
Expand Down Expand Up @@ -783,7 +783,7 @@ export class Collection<TSchema extends Document = Document> {
if (typeof options === 'function') (callback = options), (options = {});

return executeOperation(
getTopology(this),
this,
new OptionsOperation(this as TODO_NODE_3286, resolveOptions(this, options)),
callback
);
Expand All @@ -806,7 +806,7 @@ export class Collection<TSchema extends Document = Document> {
if (typeof options === 'function') (callback = options), (options = {});

return executeOperation(
getTopology(this),
this,
new IsCappedOperation(this as TODO_NODE_3286, resolveOptions(this, options)),
callback
);
Expand Down Expand Up @@ -857,7 +857,7 @@ export class Collection<TSchema extends Document = Document> {
if (typeof options === 'function') (callback = options), (options = {});

return executeOperation(
getTopology(this),
this,
new CreateIndexOperation(
this as TODO_NODE_3286,
this.collectionName,
Expand Down Expand Up @@ -918,7 +918,7 @@ export class Collection<TSchema extends Document = Document> {
if (typeof options.maxTimeMS !== 'number') delete options.maxTimeMS;

return executeOperation(
getTopology(this),
this,
new CreateIndexesOperation(
this as TODO_NODE_3286,
this.collectionName,
Expand Down Expand Up @@ -952,7 +952,7 @@ export class Collection<TSchema extends Document = Document> {
options.readPreference = ReadPreference.primary;

return executeOperation(
getTopology(this),
this,
new DropIndexOperation(this as TODO_NODE_3286, indexName, options),
callback
);
Expand All @@ -975,7 +975,7 @@ export class Collection<TSchema extends Document = Document> {
if (typeof options === 'function') (callback = options), (options = {});

return executeOperation(
getTopology(this),
this,
new DropIndexesOperation(this as TODO_NODE_3286, resolveOptions(this, options)),
callback
);
Expand Down Expand Up @@ -1013,7 +1013,7 @@ export class Collection<TSchema extends Document = Document> {
if (typeof options === 'function') (callback = options), (options = {});

return executeOperation(
getTopology(this),
this,
new IndexExistsOperation(this as TODO_NODE_3286, indexes, resolveOptions(this, options)),
callback
);
Expand All @@ -1036,7 +1036,7 @@ export class Collection<TSchema extends Document = Document> {
if (typeof options === 'function') (callback = options), (options = {});

return executeOperation(
getTopology(this),
this,
new IndexInformationOperation(this.s.db, this.collectionName, resolveOptions(this, options)),
callback
);
Expand All @@ -1058,7 +1058,7 @@ export class Collection<TSchema extends Document = Document> {
): Promise<number> | void {
if (typeof options === 'function') (callback = options), (options = {});
return executeOperation(
getTopology(this),
this,
new EstimatedDocumentCountOperation(this as TODO_NODE_3286, resolveOptions(this, options)),
callback
);
Expand Down Expand Up @@ -1118,7 +1118,7 @@ export class Collection<TSchema extends Document = Document> {

filter ??= {};
return executeOperation(
getTopology(this),
this,
new CountDocumentsOperation(
this as TODO_NODE_3286,
filter as Document,
Expand Down Expand Up @@ -1193,7 +1193,7 @@ export class Collection<TSchema extends Document = Document> {

filter ??= {};
return executeOperation(
getTopology(this),
this,
new DistinctOperation(
this as TODO_NODE_3286,
key as TODO_NODE_3286,
Expand Down Expand Up @@ -1221,7 +1221,7 @@ export class Collection<TSchema extends Document = Document> {
if (typeof options === 'function') (callback = options), (options = {});

return executeOperation(
getTopology(this),
this,
new IndexesOperation(this as TODO_NODE_3286, resolveOptions(this, options)),
callback
);
Expand All @@ -1245,7 +1245,7 @@ export class Collection<TSchema extends Document = Document> {
options = options ?? {};

return executeOperation(
getTopology(this),
this,
new CollStatsOperation(this as TODO_NODE_3286, options),
callback
);
Expand Down Expand Up @@ -1277,7 +1277,7 @@ export class Collection<TSchema extends Document = Document> {
if (typeof options === 'function') (callback = options), (options = {});

return executeOperation(
getTopology(this),
this,
new FindOneAndDeleteOperation(
this as TODO_NODE_3286,
filter,
Expand Down Expand Up @@ -1324,7 +1324,7 @@ export class Collection<TSchema extends Document = Document> {
if (typeof options === 'function') (callback = options), (options = {});

return executeOperation(
getTopology(this),
this,
new FindOneAndReplaceOperation(
this as TODO_NODE_3286,
filter,
Expand Down Expand Up @@ -1372,7 +1372,7 @@ export class Collection<TSchema extends Document = Document> {
if (typeof options === 'function') (callback = options), (options = {});

return executeOperation(
getTopology(this),
this,
new FindOneAndUpdateOperation(
this as TODO_NODE_3286,
filter,
Expand Down Expand Up @@ -1495,7 +1495,7 @@ export class Collection<TSchema extends Document = Document> {
}

return executeOperation(
getTopology(this),
this,
new MapReduceOperation(
this as TODO_NODE_3286,
map,
Expand Down Expand Up @@ -1636,7 +1636,7 @@ export class Collection<TSchema extends Document = Document> {

filter ??= {};
return executeOperation(
getTopology(this),
this,
new CountOperation(
MongoDBNamespace.fromString(this.namespace),
filter,
Expand Down
2 changes: 1 addition & 1 deletion src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ export abstract class AbstractCursor<
batchSize
});

executeOperation(this.topology, getMoreOperation, callback);
executeOperation(this, getMoreOperation, callback);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/cursor/aggregation_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export class AggregationCursor<TSchema = Document> extends AbstractCursor<TSchem
session
});

executeOperation(this.topology, aggregateOperation, (err, response) => {
executeOperation(this, aggregateOperation, (err, response) => {
if (err || response == null) return callback(err);

// TODO: NODE-2882
Expand All @@ -88,7 +88,7 @@ export class AggregationCursor<TSchema = Document> extends AbstractCursor<TSchem
if (verbosity == null) verbosity = true;

return executeOperation(
this.topology,
this,
new AggregateOperation(this.namespace, this[kPipeline], {
...this[kOptions], // NOTE: order matters here, we may need to refine this
...this.cursorOptions,
Expand Down
Loading

0 comments on commit 329f081

Please sign in to comment.