Skip to content

Commit e13038d

Browse files
baileympearsonclemclxnbbeeken
authored
fix(NODE-5316): prevent parallel topology creation in MongoClient.connect (#3696)
Co-authored-by: Clément Cloux <clement.cloux@ynov.com> Co-authored-by: Neal Beeken <neal.beeken@mongodb.com>
1 parent 261199f commit e13038d

File tree

2 files changed

+108
-35
lines changed

2 files changed

+108
-35
lines changed

src/mongo_client.ts

Lines changed: 49 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,8 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
343343
topology?: Topology;
344344
/** @internal */
345345
readonly mongoLogger: MongoLogger;
346+
/** @internal */
347+
private connectionLock?: Promise<this>;
346348

347349
/**
348350
* The consolidate, parsed, transformed and merged options.
@@ -447,54 +449,66 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
447449
}
448450

449451
return maybeCallback(async () => {
450-
if (this.topology && this.topology.isConnected()) {
452+
if (this.connectionLock) {
453+
return this.connectionLock;
454+
}
455+
try {
456+
this.connectionLock = this._connect();
457+
await this.connectionLock;
451458
return this;
459+
} finally {
460+
this.connectionLock = undefined;
452461
}
462+
}, callback);
463+
}
453464

454-
const options = this[kOptions];
465+
private async _connect(): Promise<this> {
466+
if (this.topology && this.topology.isConnected()) {
467+
return this;
468+
}
455469

456-
if (typeof options.srvHost === 'string') {
457-
const hosts = await resolveSRVRecord(options);
470+
const options = this[kOptions];
458471

459-
for (const [index, host] of hosts.entries()) {
460-
options.hosts[index] = host;
461-
}
472+
if (typeof options.srvHost === 'string') {
473+
const hosts = await resolveSRVRecord(options);
474+
475+
for (const [index, host] of hosts.entries()) {
476+
options.hosts[index] = host;
462477
}
478+
}
463479

464-
const topology = new Topology(options.hosts, options);
465-
// Events can be emitted before initialization is complete so we have to
466-
// save the reference to the topology on the client ASAP if the event handlers need to access it
467-
this.topology = topology;
468-
topology.client = this;
480+
const topology = new Topology(options.hosts, options);
481+
// Events can be emitted before initialization is complete so we have to
482+
// save the reference to the topology on the client ASAP if the event handlers need to access it
483+
this.topology = topology;
484+
topology.client = this;
469485

470-
topology.once(Topology.OPEN, () => this.emit('open', this));
486+
topology.once(Topology.OPEN, () => this.emit('open', this));
471487

472-
for (const event of MONGO_CLIENT_EVENTS) {
473-
topology.on(event, (...args: any[]): unknown => this.emit(event, ...(args as any)));
474-
}
488+
for (const event of MONGO_CLIENT_EVENTS) {
489+
topology.on(event, (...args: any[]): unknown => this.emit(event, ...(args as any)));
490+
}
475491

476-
const topologyConnect = async () => {
477-
try {
478-
await promisify(callback => topology.connect(options, callback))();
479-
} catch (error) {
480-
topology.close({ force: true });
481-
throw error;
482-
}
483-
};
484-
485-
if (this.autoEncrypter) {
486-
const initAutoEncrypter = promisify(callback => this.autoEncrypter?.init(callback));
487-
await initAutoEncrypter();
488-
await topologyConnect();
489-
await options.encrypter.connectInternalClient();
490-
} else {
491-
await topologyConnect();
492+
const topologyConnect = async () => {
493+
try {
494+
await promisify(callback => topology.connect(options, callback))();
495+
} catch (error) {
496+
topology.close({ force: true });
497+
throw error;
492498
}
499+
};
493500

494-
return this;
495-
}, callback);
496-
}
501+
if (this.autoEncrypter) {
502+
const initAutoEncrypter = promisify(callback => this.autoEncrypter?.init(callback));
503+
await initAutoEncrypter();
504+
await topologyConnect();
505+
await options.encrypter.connectInternalClient();
506+
} else {
507+
await topologyConnect();
508+
}
497509

510+
return this;
511+
}
498512
/**
499513
* Close the db and its underlying connections
500514
*

test/integration/node-specific/mongo_client.test.ts

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -516,6 +516,65 @@ describe('class MongoClient', function () {
516516
);
517517
});
518518

519+
context('concurrent #connect()', () => {
520+
let client: MongoClient;
521+
let topologyOpenEvents;
522+
523+
/** Keep track number of call to client connect to close as many as connect (otherwise leak_checker hook will failed) */
524+
let clientConnectCounter: number;
525+
526+
/**
527+
* Wrap the connect method of the client to keep track
528+
* of number of times connect is called
529+
*/
530+
async function clientConnect() {
531+
if (!client) {
532+
return;
533+
}
534+
clientConnectCounter++;
535+
return client.connect();
536+
}
537+
538+
beforeEach(async function () {
539+
client = this.configuration.newClient();
540+
topologyOpenEvents = [];
541+
clientConnectCounter = 0;
542+
client.on('open', event => topologyOpenEvents.push(event));
543+
});
544+
545+
afterEach(async function () {
546+
// close `clientConnectCounter` times
547+
const clientClosePromises = Array.from({ length: clientConnectCounter }, () =>
548+
client.close()
549+
);
550+
await Promise.all(clientClosePromises);
551+
});
552+
553+
it('parallel client connect calls only create one topology', async function () {
554+
await Promise.all([clientConnect(), clientConnect(), clientConnect()]);
555+
556+
expect(topologyOpenEvents).to.have.lengthOf(1);
557+
expect(client.topology?.isConnected()).to.be.true;
558+
});
559+
560+
it('when connect rejects lock is released regardless', async function () {
561+
const internalConnectStub = sinon.stub(client, '_connect' as keyof MongoClient);
562+
internalConnectStub.onFirstCall().rejects(new Error('cannot connect'));
563+
564+
// first call rejected to simulate a connection failure
565+
const error = await clientConnect().catch(error => error);
566+
expect(error).to.match(/cannot connect/);
567+
568+
internalConnectStub.restore();
569+
570+
// second call should connect
571+
await clientConnect();
572+
573+
expect(topologyOpenEvents).to.have.lengthOf(1);
574+
expect(client.topology?.isConnected()).to.be.true;
575+
});
576+
});
577+
519578
context('#close()', () => {
520579
let client: MongoClient;
521580
let db: Db;

0 commit comments

Comments
 (0)