From b1f296ffb435b97320d91703796647b968aa4d4e Mon Sep 17 00:00:00 2001 From: Dan Aprahamian Date: Fri, 23 Feb 2018 15:17:31 -0500 Subject: [PATCH] fix(sessions): move active session tracking to topology base (#1665) Moves the tracking of active sessions to the topology base. Doing this allows us to ensure that all active and pooled sessions are ended when the topology closes, and that implicit sessions are tracked. Also adds a test case to make sure none of our unit tests are leaking sessions, and corrects many leaky tests. Also bumps version of mongodb-core Part of HELP-5384 --- .eslintrc | 3 +- lib/mongo_client.js | 16 +- lib/topologies/mongos.js | 2 + lib/topologies/replset.js | 19 +- lib/topologies/server.js | 2 + lib/topologies/topology_base.js | 20 +- package.json | 3 +- test/functional/apm_tests.js | 8 + test/functional/crud_api_tests.js | 2 +- test/functional/crud_spec_tests.js | 6 + test/functional/cursor_tests.js | 14 +- test/functional/cursorstream_tests.js | 26 +- test/functional/db_tests.js | 3 + test/functional/gridfs_stream_tests.js | 25 ++ test/functional/index_tests.js | 97 ++------ test/functional/insert_tests.js | 13 +- test/functional/operation_example_tests.js | 11 +- .../operation_generators_example_tests.js | 225 ++++++++---------- .../operation_promises_example_tests.js | 15 +- test/functional/session_leak_test.js | 92 +++++++ .../sharding_read_preference_tests.js | 21 +- test/unit/sessions/client_tests.js | 1 + test/unit/sessions/collection_tests.js | 2 + 23 files changed, 372 insertions(+), 254 deletions(-) create mode 100644 test/functional/session_leak_test.js diff --git a/.eslintrc b/.eslintrc index 2440ddd9083..283970bdeb9 100644 --- a/.eslintrc +++ b/.eslintrc @@ -7,7 +7,8 @@ "mocha": true }, "globals": { - "Promise": true + "Promise": true, + "Set": true }, "parserOptions": { "ecmaVersion": 2017 diff --git a/lib/mongo_client.js b/lib/mongo_client.js index 46813d36e78..04a16375395 100644 --- a/lib/mongo_client.js +++ b/lib/mongo_client.js @@ -327,14 +327,6 @@ MongoClient.prototype.close = function(force, callback) { // Remove listeners after emit self.removeAllListeners('close'); - // If we have sessions, we want to send a single `endSessions` command for them, - // and then individually clean them up. They will be removed from the internal state - // when they emit their `ended` events. - if (this.s.sessions.length) { - this.topology.endSessions(this.s.sessions); - this.s.sessions.forEach(session => session.endSession({ skipCommand: true })); - } - // Callback after next event loop tick if (typeof callback === 'function') return process.nextTick(function() { @@ -507,13 +499,7 @@ MongoClient.prototype.startSession = function(options) { throw new MongoError('Current topology does not support sessions'); } - const session = this.topology.startSession(options); - session.once('ended', () => { - this.s.sessions = this.s.sessions.filter(s => s.equals(session)); - }); - - this.s.sessions.push(session); - return session; + return this.topology.startSession(options); }; var mergeOptions = function(target, source, flatten) { diff --git a/lib/topologies/mongos.js b/lib/topologies/mongos.js index e10168d1c55..b556b36195d 100644 --- a/lib/topologies/mongos.js +++ b/lib/topologies/mongos.js @@ -190,6 +190,8 @@ class Mongos extends TopologyBase { options: options, // Server Session Pool sessionPool: null, + // Active client sessions + sessions: [], // Promise library promiseLibrary: options.promiseLibrary || Promise }; diff --git a/lib/topologies/replset.js b/lib/topologies/replset.js index ad46c17df77..63fe02db054 100644 --- a/lib/topologies/replset.js +++ b/lib/topologies/replset.js @@ -206,6 +206,8 @@ class ReplSet extends TopologyBase { options: options, // Server Session Pool sessionPool: null, + // Active client sessions + sessions: [], // Promise library promiseLibrary: options.promiseLibrary || Promise }; @@ -371,22 +373,9 @@ class ReplSet extends TopologyBase { } close(forceClosed) { - var self = this; - // Call destroy on the topology - this.s.coreTopology.destroy({ - force: typeof forceClosed === 'boolean' ? forceClosed : false - }); - - // We need to wash out all stored processes - if (forceClosed === true) { - this.s.storeOptions.force = forceClosed; - this.s.store.flush(); - } + super.close(forceClosed); - var events = ['timeout', 'error', 'close', 'joined', 'left']; - events.forEach(function(e) { - self.removeAllListeners(e); - }); + ['timeout', 'error', 'close', 'joined', 'left'].forEach(e => this.removeAllListeners(e)); } } diff --git a/lib/topologies/server.js b/lib/topologies/server.js index 5823a11961f..cbf4c78ebd2 100644 --- a/lib/topologies/server.js +++ b/lib/topologies/server.js @@ -198,6 +198,8 @@ class Server extends TopologyBase { options: options, // Server Session Pool sessionPool: null, + // Active client sessions + sessions: [], // Promise library promiseLibrary: promiseLibrary || Promise }; diff --git a/lib/topologies/topology_base.js b/lib/topologies/topology_base.js index a5999b8e87d..c1654d10941 100644 --- a/lib/topologies/topology_base.js +++ b/lib/topologies/topology_base.js @@ -290,7 +290,13 @@ class TopologyBase extends EventEmitter { } startSession(options) { - return new ClientSession(this, this.s.sessionPool, options); + const session = new ClientSession(this, this.s.sessionPool, options); + session.once('ended', () => { + this.s.sessions = this.s.sessions.filter(s => !s.equals(session)); + }); + + this.s.sessions.push(session); + return session; } endSessions(sessions, callback) { @@ -388,6 +394,18 @@ class TopologyBase extends EventEmitter { } close(forceClosed) { + // If we have sessions, we want to send a single `endSessions` command for them, + // and then individually clean them up. They will be removed from the internal state + // when they emit their `ended` events. + if (this.s.sessions.length) { + this.endSessions(this.s.sessions.map(session => session.id)); + this.s.sessions.forEach(session => session.endSession({ skipCommand: true })); + } + + if (this.s.sessionPool) { + this.s.sessionPool.endAllPooledSessions(); + } + this.s.coreTopology.destroy({ force: typeof forceClosed === 'boolean' ? forceClosed : false }); diff --git a/package.json b/package.json index 246856de5f7..fce01c145af 100644 --- a/package.json +++ b/package.json @@ -13,7 +13,7 @@ "official" ], "dependencies": { - "mongodb-core": "3.0.2" + "mongodb-core": "3.0.3" }, "devDependencies": { "betterbenchmarks": "^0.1.0", @@ -32,6 +32,7 @@ "mongodb-test-runner": "^1.1.18", "prettier": "^1.5.3", "semver": "5.4.1", + "sinon": "^4.3.0", "worker-farm": "^1.5.0" }, "author": "Christian Kvalheim", diff --git a/test/functional/apm_tests.js b/test/functional/apm_tests.js index 41438dfc8ee..f53a908d7e8 100644 --- a/test/functional/apm_tests.js +++ b/test/functional/apm_tests.js @@ -1036,6 +1036,10 @@ describe('APM', function() { // Get the result result = results.successes.shift(); + if (result.commandName === 'endSessions') { + result = results.successes.shift(); + } + // Validate the test expect(commandName).to.equal(result.commandName); // Do we have a getMore command @@ -1054,6 +1058,10 @@ describe('APM', function() { results.failures = filterSessionsCommands(results.failures); result = results.failures.shift(); + if (result.commandName === 'endSessions') { + result = results.failures.shift(); + } + // Validate the test expect(commandName).to.equal(result.commandName); } diff --git a/test/functional/crud_api_tests.js b/test/functional/crud_api_tests.js index de6d0246c42..de791c83d48 100644 --- a/test/functional/crud_api_tests.js +++ b/test/functional/crud_api_tests.js @@ -830,7 +830,7 @@ describe('CRUD API', function() { test.equal(null, err); // Delete all items with no selector - db.collection('t6_1').deleteMany(function(err) { + db.collection('t6_1').deleteMany({}, function(err) { test.equal(null, err); client.close(); diff --git a/test/functional/crud_spec_tests.js b/test/functional/crud_spec_tests.js index c8c03552b5d..28a91549429 100644 --- a/test/functional/crud_spec_tests.js +++ b/test/functional/crud_spec_tests.js @@ -39,6 +39,12 @@ describe('CRUD spec', function() { }); }); + afterEach(() => { + if (testContext.client) { + testContext.client.close(); + } + }); + describe('read', function() { readScenarios.forEach(function(scenarioData) { var scenarioName = scenarioData[0]; diff --git a/test/functional/cursor_tests.js b/test/functional/cursor_tests.js index f660e0e7f8c..1b9396d9841 100644 --- a/test/functional/cursor_tests.js +++ b/test/functional/cursor_tests.js @@ -1728,6 +1728,7 @@ describe('Cursor', function() { test.equal(1, items.length); test.equal(2, items[0].a); test.equal(undefined, items[0].x); + client.close(); done(); }); }); @@ -2296,9 +2297,9 @@ describe('Cursor', function() { if (count === 0) { var stream = collection.find({}, { tailable: true, awaitData: true }).stream(); - + // let index = 0; stream.on('data', function() { - // console.log("doc :: " + (index++)); + // console.log('doc :: ' + index++); }); stream.on('error', function(err) { @@ -2319,14 +2320,17 @@ describe('Cursor', function() { // Just hammer the server for (var i = 0; i < 100; i++) { + const id = i; process.nextTick(function() { - collection.insert({ id: i }, function(err) { + collection.insert({ id }, function(err) { test.equal(null, err); + + if (id === 99) { + setTimeout(() => client.close()); + } }); }); } - - setTimeout(() => client.close(), 800); } }); } diff --git a/test/functional/cursorstream_tests.js b/test/functional/cursorstream_tests.js index 7bcd6c29e69..3b30e80e9ac 100644 --- a/test/functional/cursorstream_tests.js +++ b/test/functional/cursorstream_tests.js @@ -70,9 +70,16 @@ describe('Cursor Streams', function() { // When the stream is done stream.on('end', function() { - expect(data).to.have.length(3000); - client.close(); - done(); + setTimeout(() => { + let err; + try { + expect(data).to.have.length(3000); + } catch (e) { + err = e; + } + client.close(); + done(err); + }, 1000); }); } }); @@ -139,9 +146,16 @@ describe('Cursor Streams', function() { // When the stream is done stream.on('end', function() { - expect(data).to.have.length(10000); - client.close(); - done(); + setTimeout(() => { + let err; + try { + expect(data).to.have.length(10000); + } catch (e) { + err = e; + } + client.close(); + done(err); + }, 1000); }); } }); diff --git a/test/functional/db_tests.js b/test/functional/db_tests.js index e2f967801a6..2a118286c7a 100644 --- a/test/functional/db_tests.js +++ b/test/functional/db_tests.js @@ -98,11 +98,13 @@ describe('Db', function() { coll.findOne({}, null, function() { //e - errors b/c findOne needs a query selector test.equal(1, count); + client.close(); done(); }); } catch (e) { process.nextTick(function() { test.equal(1, count); + client.close(); done(); }); } @@ -465,6 +467,7 @@ describe('Db', function() { return c.collectionName; }); test.notEqual(-1, collections.indexOf('node972.test')); + client.close(); done(); }); }); diff --git a/test/functional/gridfs_stream_tests.js b/test/functional/gridfs_stream_tests.js index 134292bed76..46de0ce4ceb 100644 --- a/test/functional/gridfs_stream_tests.js +++ b/test/functional/gridfs_stream_tests.js @@ -81,6 +81,7 @@ describe('GridFS Stream', function() { test.equal(error, null); test.equal(indexes.length, 2); test.equal(indexes[1].name, 'files_id_1_n_1'); + client.close(); done(); }); }); @@ -166,6 +167,7 @@ describe('GridFS Stream', function() { test.equal(error, null); test.equal(indexes.length, 2); test.equal(indexes[1].name, 'files_id_1_n_1'); + client.close(); done(); }); }); @@ -237,6 +239,7 @@ describe('GridFS Stream', function() { var hash = crypto.createHash('md5'); hash.update(license); test.equal(docs[0].md5, hash.digest('hex')); + client.close(); done(); }); }); @@ -283,6 +286,7 @@ describe('GridFS Stream', function() { downloadStream.on('error', function(err) { test.equal('ENOENT', err.code); + client.close(); client.close(); done(); }); @@ -333,6 +337,7 @@ describe('GridFS Stream', function() { downloadStream.on('end', function() { test.ok(gotData); + client.close(); done(); }); }); @@ -401,6 +406,7 @@ describe('GridFS Stream', function() { // care that we got between 1 and 3, and got the right result test.ok(gotData >= 1 && gotData <= 3); test.equal(str, 'pache'); + client.close(); done(); }); }); @@ -459,6 +465,7 @@ describe('GridFS Stream', function() { test.equal(error, null); test.equal(docs.length, 0); + client.close(); done(); }); }); @@ -521,6 +528,7 @@ describe('GridFS Stream', function() { // Fail if user tries to abort an aborted stream uploadStream.abort().then(null, function(error) { test.equal(error.toString(), 'Error: Cannot call abort() on a stream twice'); + client.close(); done(); }); }); @@ -582,6 +590,7 @@ describe('GridFS Stream', function() { // Fail if user tries to abort an aborted stream uploadStream.abort().then(null, function(error) { test.equal(error.toString(), 'Error: Cannot call abort() on a stream twice'); + client.close(); done(); }); }); @@ -642,6 +651,7 @@ describe('GridFS Stream', function() { downloadStream.on('end', function() { test.equal(downloadStream.s.cursor, null); if (finished.close) { + client.close(); return done(); } finished.end = true; @@ -649,6 +659,7 @@ describe('GridFS Stream', function() { downloadStream.on('close', function() { if (finished.end) { + client.close(); return done(); } finished.close = true; @@ -712,6 +723,7 @@ describe('GridFS Stream', function() { test.equal(error, null); test.equal(docs.length, 0); + client.close(); done(); }); }); @@ -756,6 +768,7 @@ describe('GridFS Stream', function() { sort: { _id: 1 } }); + client.close(); done(); }); // END @@ -811,6 +824,7 @@ describe('GridFS Stream', function() { test.equal(error, null); test.equal(docs.length, 0); + client.close(); done(); }); }); @@ -870,6 +884,7 @@ describe('GridFS Stream', function() { test.equal(error, null); test.equal(docs.length, 0); + client.close(); done(); }); }); @@ -918,6 +933,7 @@ describe('GridFS Stream', function() { bucket.find({}, { batchSize: 1 }).toArray(function(err, files) { test.equal(null, err); test.equal(1, files.length); + client.close(); done(); }); }); @@ -965,6 +981,7 @@ describe('GridFS Stream', function() { // Rename the file bucket.rename(id, 'renamed_it.dat', function(err) { expect(err).to.not.exist; + client.close(); done(); }); }); @@ -1009,6 +1026,7 @@ describe('GridFS Stream', function() { // As per spec, make sure we didn't actually fire a query // because the document length is 0 test.equal(stream.s.cursor, null); + client.close(); done(); }); }); @@ -1059,6 +1077,7 @@ describe('GridFS Stream', function() { } if (--num === 0) { + client.close(); done(); } }); @@ -1105,9 +1124,11 @@ describe('GridFS Stream', function() { download.on('error', function(error) { if (!testSpec.assert.error) { test.ok(false); + client.close(); done(); } test.ok(error.toString().indexOf(testSpec.assert.error) !== -1); + client.close(); done(); }); @@ -1115,10 +1136,12 @@ describe('GridFS Stream', function() { var result = testSpec.assert.result; if (!result) { test.ok(false); + client.close(); done(); } test.equal(res.toString('hex'), result.$hex); + client.close(); done(); }); }; @@ -1282,6 +1305,7 @@ describe('GridFS Stream', function() { // Fail if user tries to abort an aborted stream uploadStream.abort().then(null, function(error) { test.equal(error.toString(), 'Error: Cannot call abort() on a stream twice'); + client.close(); done(); }); }); @@ -1352,6 +1376,7 @@ describe('GridFS Stream', function() { // care that we got between 1 and 3, and got the right result test.ok(gotData >= 1 && gotData <= 3); test.equal(str, 'pache'); + client.close(); done(); }); }); diff --git a/test/functional/index_tests.js b/test/functional/index_tests.js index 6ad31e13ce0..2682a33cc5f 100644 --- a/test/functional/index_tests.js +++ b/test/functional/index_tests.js @@ -270,68 +270,6 @@ describe('Indexes', function() { } }); - /** - * @ignore - */ - it('shouldThrowErrorOnAttemptingSafeCreateIndexWithNoCallback', { - metadata: { - requires: { topology: ['single', 'replicaset', 'sharded', 'ssl', 'heap', 'wiredtiger'] } - }, - - // The actual test we wish to run - test: function(done) { - var configuration = this.configuration; - var client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 }); - client.connect(function(err, client) { - var db = client.db(configuration.db); - db.createCollection('shouldThrowErrorOnAttemptingSafeUpdateWithNoCallback', function( - err, - collection - ) { - try { - // insert a doc - collection.createIndex({ a: 1 }, configuration.writeConcernMax()); - test.ok(false); - } catch (err) {} // eslint-disable-line - - client.close(); - done(); - }); - }); - } - }); - - /** - * @ignore - */ - it('shouldThrowErrorOnAttemptingSafeEnsureIndexWithNoCallback', { - metadata: { - requires: { topology: ['single', 'replicaset', 'sharded', 'ssl', 'heap', 'wiredtiger'] } - }, - - // The actual test we wish to run - test: function(done) { - var configuration = this.configuration; - var client = configuration.newClient(configuration.writeConcernMax(), { poolSize: 1 }); - client.connect(function(err, client) { - var db = client.db(configuration.db); - db.createCollection('shouldThrowErrorOnAttemptingSafeUpdateWithNoCallback', function( - err, - collection - ) { - try { - // insert a doc - collection.ensureIndex({ a: 1 }, configuration.writeConcernMax()); - test.ok(false); - } catch (err) {} // eslint-disable-line - - client.close(); - done(); - }); - }); - } - }); - /** * @ignore */ @@ -782,10 +720,16 @@ describe('Indexes', function() { collection.ensureIndex({ a: 1 }, configuration.writeConcernMax(), function(err) { test.equal(null, err); - collection.dropIndex('a_1'); - - client.close(); - done(); + collection + .dropIndex('a_1') + .then(() => { + client.close(); + done(); + }) + .catch(err => { + client.close(); + done(err); + }); }); }); }); @@ -1109,7 +1053,10 @@ describe('Indexes', function() { * @ignore */ it('should correctly error out due to driver close', { - metadata: { requires: { topology: ['single'] } }, + metadata: { + requires: { topology: ['single'] }, + sessions: { skipLeakTests: true } + }, // The actual test we wish to run test: function(done) { @@ -1118,14 +1065,16 @@ describe('Indexes', function() { client.connect(function(err, client) { var db = client.db(configuration.db); client.close(function() { - db.createCollection('nonexisting', { w: 1 }, function(err) { - test.ok(err != null); - db.collection('nonexisting', { strict: true }, function(err) { + setTimeout(() => { + db.createCollection('nonexisting', { w: 1 }, function(err) { test.ok(err != null); - db.collection('nonexisting', { strict: false }, function(err) { - // When set to false (default) it should not create an error - test.ok(err === null); - done(); + db.collection('nonexisting', { strict: true }, function(err) { + test.ok(err != null); + db.collection('nonexisting', { strict: false }, function(err) { + // When set to false (default) it should not create an error + test.ok(err === null); + setTimeout(() => done()); + }); }); }); }); diff --git a/test/functional/insert_tests.js b/test/functional/insert_tests.js index 183fba4560a..e2fa98399fe 100644 --- a/test/functional/insert_tests.js +++ b/test/functional/insert_tests.js @@ -1656,9 +1656,16 @@ describe('Insert', function() { client.connect(function(err, client) { var db = client.db(configuration.db); var collection = db.collection('shouldExecuteInsertWithNoCallbackAndWriteConcern'); - collection.insert({ a: { b: { c: 1 } } }); - client.close(); - done(); + collection.insert({ a: { b: { c: 1 } } }).then( + () => { + client.close(); + done(); + }, + err => { + client.close(); + done(err); + } + ); }); } }); diff --git a/test/functional/operation_example_tests.js b/test/functional/operation_example_tests.js index 903e12e590e..2efb1a0aedc 100644 --- a/test/functional/operation_example_tests.js +++ b/test/functional/operation_example_tests.js @@ -3170,7 +3170,8 @@ describe('Operation Examples', function() { */ it('shouldCorrectlyRenameCollection', { metadata: { - requires: { topology: ['single', 'replicaset', 'sharded', 'ssl', 'heap', 'wiredtiger'] } + requires: { topology: ['single', 'replicaset', 'sharded', 'ssl', 'heap', 'wiredtiger'] }, + sessions: { skipLeakTests: true } }, // The actual test we wish to run @@ -4352,12 +4353,14 @@ describe('Operation Examples', function() { test.ok(result); test.equal(null, err); + const oldClient = client; // Authenticate MongoClient.connect( 'mongodb://user:name@localhost:27017/integration_tests', function(err, client) { expect(err).to.exist; expect(client).to.not.exist; + oldClient.close(); done(); } ); @@ -8970,8 +8973,10 @@ describe('Operation Examples', function() { // When the stream is done stream.on('end', function() { - client.close(); - done(); + setTimeout(() => { + client.close(); + done(); + }, 1000); }); }); }); diff --git a/test/functional/operation_generators_example_tests.js b/test/functional/operation_generators_example_tests.js index ddd2d5ea675..2ca94f17cc1 100644 --- a/test/functional/operation_generators_example_tests.js +++ b/test/functional/operation_generators_example_tests.js @@ -1620,15 +1620,11 @@ describe('Operation (Generators)', function() { // Create a simple single field index yield collection.ensureIndex({ a: 1 }, configuration.writeConcernMax()); - setTimeout(function() { - return co(function*() { - // List all of the indexes on the collection - var indexes = yield collection.indexes(); - test.equal(3, indexes.length); + // List all of the indexes on the collection + var indexes = yield collection.indexes(); + test.equal(3, indexes.length); - client.close(); - }); - }, 1000); + client.close(); }); // END } @@ -1844,17 +1840,11 @@ describe('Operation (Generators)', function() { // BEGIN var collection = db.collection('simple_document_insert_collection_no_safe_with_generators'); // Insert a single document - collection.insertOne({ hello: 'world_no_safe' }); - - // Wait for a second before finishing up, to ensure we have written the item to disk - setTimeout(function() { - return co(function*() { - // Fetch the document - var item = yield collection.findOne({ hello: 'world_no_safe' }); - test.equal('world_no_safe', item.hello); - client.close(); - }); - }, 100); + yield collection.insertOne({ hello: 'world_no_safe' }); + + var item = yield collection.findOne({ hello: 'world_no_safe' }); + test.equal('world_no_safe', item.hello); + client.close(); }); // END } @@ -2346,7 +2336,10 @@ describe('Operation (Generators)', function() { * @ignore */ it('shouldCorrectlyRenameCollectionWithGenerators', { - metadata: { requires: { generators: true, topology: ['single'] } }, + metadata: { + requires: { generators: true, topology: ['single'] }, + sessions: { skipLeakTests: true } + }, // The actual test we wish to run test: function() { @@ -2477,17 +2470,12 @@ describe('Operation (Generators)', function() { // Fetch the collection var collection = db.collection('save_a_simple_document_with_generators'); // Save a document with no safe option - collection.save({ hello: 'world' }); - - // Wait for a second - setTimeout(function() { - return co(function*() { - // Find the saved document - var item = yield collection.findOne({ hello: 'world' }); - test.equal('world', item.hello); - client.close(); - }); - }, 2000); + yield collection.save({ hello: 'world' }); + + // Find the saved document + var item = yield collection.findOne({ hello: 'world' }); + test.equal('world', item && item.hello); + client.close(); }); // END } @@ -2591,19 +2579,14 @@ describe('Operation (Generators)', function() { yield collection.insertOne({ a: 1 }, configuration.writeConcernMax()); // Update the document with an atomic operator - collection.updateOne({ a: 1 }, { $set: { b: 2 } }); + yield collection.updateOne({ a: 1 }, { $set: { b: 2 } }); - // Wait for a second then fetch the document - setTimeout(function() { - return co(function*() { - // Fetch the document that we modified - var item = yield collection.findOne({ a: 1 }); - test.equal(1, item.a); - test.equal(2, item.b); + var item = yield collection.findOne({ a: 1 }); - client.close(); - }); - }, 1000); + test.equal(1, item.a); + test.equal(2, item.b); + + client.close(); }); // END } @@ -2947,54 +2930,52 @@ describe('Operation (Generators)', function() { readPreference: ReadPreference.PRIMARY }); - setTimeout(function() { - return co(function*() { - // Locate the entry - var collection = db.collection('test_eval_with_generators'); - var item = yield collection.findOne(); - test.equal(5, item.y); - tests_done(); - - // Evaluate a function with 2 parameters passed in - var result = yield db.eval('function (x, y) {return x + y;}', [2, 3]); - test.equal(5, result); - tests_done(); - - // Evaluate a function with no parameters passed in - result = yield db.eval('function () {return 5;}'); - test.equal(5, result); - tests_done(); - - // Evaluate a statement - result = yield db.eval('2 + 3;'); - test.equal(5, result); - tests_done(); - - // Evaluate a statement using the code object - result = yield db.eval(new Code('2 + 3;')); - test.equal(5, result); - tests_done(); - - // Evaluate a statement using the code object including a scope - result = yield db.eval(new Code('return i;', { i: 2 })); - test.equal(2, result); - tests_done(); - - // Evaluate a statement using the code object including a scope - result = yield db.eval(new Code('i + 3;', { i: 2 })); - test.equal(5, result); - tests_done(); - - try { - // Evaluate an illegal statement - yield db.eval('5 ++ 5;'); - } catch (err) { - test.ok(err instanceof Error); - test.ok(err.message != null); - tests_done(); - } - }); - }, 1000); + yield new Promise(resolve => setTimeout(resolve, 1000)); + + // Locate the entry + var collection = db.collection('test_eval_with_generators'); + var item = yield collection.findOne(); + test.equal(5, item.y); + tests_done(); + + // Evaluate a function with 2 parameters passed in + result = yield db.eval('function (x, y) {return x + y;}', [2, 3]); + test.equal(5, result); + tests_done(); + + // Evaluate a function with no parameters passed in + result = yield db.eval('function () {return 5;}'); + test.equal(5, result); + tests_done(); + + // Evaluate a statement + result = yield db.eval('2 + 3;'); + test.equal(5, result); + tests_done(); + + // Evaluate a statement using the code object + result = yield db.eval(new Code('2 + 3;')); + test.equal(5, result); + tests_done(); + + // Evaluate a statement using the code object including a scope + result = yield db.eval(new Code('return i;', { i: 2 })); + test.equal(2, result); + tests_done(); + + // Evaluate a statement using the code object including a scope + result = yield db.eval(new Code('i + 3;', { i: 2 })); + test.equal(5, result); + tests_done(); + + try { + // Evaluate an illegal statement + yield db.eval('5 ++ 5;'); + } catch (err) { + test.ok(err instanceof Error); + test.ok(err.message != null); + tests_done(); + } }); // END } @@ -3621,26 +3602,23 @@ describe('Operation (Generators)', function() { yield db.dropDatabase(); // Wait two seconds to let it replicate across - setTimeout(function() { - return co(function*() { - // Get the admin database - var dbs = yield db.admin().listDatabases(); - // Grab the databases - dbs = dbs.databases; - // Did we find the db - var found = false; - - // Check if we have the db in the list - for (var i = 0; i < dbs.length; i++) { - if (dbs[i].name === 'integration_tests_to_drop') found = true; - } + yield new Promise(resolve => setTimeout(resolve, 2000)); + // Get the admin database + var dbs = yield db.admin().listDatabases(); + // Grab the databases + dbs = dbs.databases; + // Did we find the db + var found = false; + + // Check if we have the db in the list + for (var i = 0; i < dbs.length; i++) { + if (dbs[i].name === 'integration_tests_to_drop') found = true; + } - // We should not find the databases - if (process.env['JENKINS'] == null) test.equal(false, found); + // We should not find the databases + if (process.env['JENKINS'] == null) test.equal(false, found); - client.close(); - }); - }, 2000); + client.close(); }); // END } @@ -6560,24 +6538,27 @@ describe('Operation (Generators)', function() { // Insert a document in the capped collection yield collection.insertMany(docs, configuration.writeConcernMax()); - var total = 0; - // Get the cursor - var cursor = collection - .find({}) - .addCursorFlag('tailable', true) - .addCursorFlag('awaitData', true); + yield new Promise(resolve => { + var total = 0; + // Get the cursor + var cursor = collection + .find({}) + .addCursorFlag('tailable', true) + .addCursorFlag('awaitData', true); - cursor.on('data', function() { - total = total + 1; + cursor.on('data', function() { + total = total + 1; - if (total === 1000) { - cursor.kill(); - } - }); + if (total === 1000) { + cursor.kill(); + } + }); - cursor.on('end', function() { - client.close(); + cursor.on('end', function() { + client.close(); + resolve(); + }); }); }); // END diff --git a/test/functional/operation_promises_example_tests.js b/test/functional/operation_promises_example_tests.js index d3d1ed584c6..824469a473f 100644 --- a/test/functional/operation_promises_example_tests.js +++ b/test/functional/operation_promises_example_tests.js @@ -2431,7 +2431,10 @@ describe('Operation (Promises)', function() { * @ignore */ it('shouldCorrectlyRenameCollectionWithPromises', { - metadata: { requires: { promises: true, topology: ['single'] } }, + metadata: { + requires: { promises: true, topology: ['single'] }, + sessions: { skipLeakTests: true } + }, // The actual test we wish to run test: function() { @@ -2538,8 +2541,14 @@ describe('Operation (Promises)', function() { }) .then(function(count) { test.equal(2, count); - client.close(); - }); + }) + .then( + () => client.close(), + e => { + client.close(); + throw e; + } + ); }); // END /* eslint-enable */ diff --git a/test/functional/session_leak_test.js b/test/functional/session_leak_test.js new file mode 100644 index 00000000000..554737e3c95 --- /dev/null +++ b/test/functional/session_leak_test.js @@ -0,0 +1,92 @@ +'use strict'; + +const expect = require('chai').expect; +const sinon = require('sinon'); +const core = require('mongodb-core'); +const MongoClient = require('../../lib/mongo_client'); +const ServerSessionPool = core.Sessions.ServerSessionPool; + +const sandbox = sinon.createSandbox(); + +let activeSessions, pooledSessions, activeSessionsBeforeClose; + +function getSessionLeakMetadata(currentTest) { + return (currentTest.metadata && currentTest.metadata.sessions) || {}; +} + +beforeEach('Session Leak Before Each - Set up clean test environment', () => { + sandbox.restore(); + activeSessions = new Set(); + pooledSessions = new Set(); + activeSessionsBeforeClose = new Set(); +}); + +beforeEach('Session Leak Before Each - setup session tracking', function() { + if (getSessionLeakMetadata(this.currentTest).skipLeakTests) { + return; + } + + const _acquire = ServerSessionPool.prototype.acquire; + sandbox.stub(ServerSessionPool.prototype, 'acquire').callsFake(function() { + const session = _acquire.apply(this, arguments); + activeSessions.add(session.id); + // console.log(`Active + ${JSON.stringify(session.id)} = ${activeSessions.size}`); + return session; + }); + + const _release = ServerSessionPool.prototype.release; + sandbox.stub(ServerSessionPool.prototype, 'release').callsFake(function(session) { + const id = session.id; + activeSessions.delete(id); + // console.log(`Active - ${JSON.stringify(id)} = ${activeSessions.size}`); + pooledSessions.add(id); + // console.log(`Pooled + ${JSON.stringify(id)} = ${activeSessions.size}`); + return _release.apply(this, arguments); + }); + + [core.Server, core.ReplSet, core.Mongos].forEach(topology => { + const _endSessions = topology.prototype.endSessions; + sandbox.stub(topology.prototype, 'endSessions').callsFake(function(sessions) { + sessions = Array.isArray(sessions) ? sessions : [sessions]; + + sessions.forEach(id => pooledSessions.delete(id)); + + return _endSessions.apply(this, arguments); + }); + }); + + const _close = MongoClient.prototype.close; + sandbox.stub(MongoClient.prototype, 'close').callsFake(function() { + activeSessionsBeforeClose = new Set(activeSessions); + + return _close.apply(this, arguments); + }); +}); + +afterEach('Session Leak After Each - ensure no leaks', function() { + if ( + this.currentTest.state === 'failed' || + getSessionLeakMetadata(this.currentTest).skipLeakTests + ) { + return; + } + + try { + expect( + activeSessionsBeforeClose.size, + `test is leaking ${activeSessionsBeforeClose.size} active sessions while running client` + ).to.equal(0); + + expect( + activeSessions.size, + `client close failed to clean up ${activeSessions.size} active sessions` + ).to.equal(0); + + expect( + pooledSessions.size, + `client close failed to clean up ${pooledSessions.size} pooled sessions` + ).to.equal(0); + } catch (e) { + this.test.error(e); + } +}); diff --git a/test/functional/sharding_read_preference_tests.js b/test/functional/sharding_read_preference_tests.js index 6e116ac195c..6e2dc6d179d 100644 --- a/test/functional/sharding_read_preference_tests.js +++ b/test/functional/sharding_read_preference_tests.js @@ -42,6 +42,8 @@ describe('Sharding (Read Preference)', function() { // Set debug level for the driver Logger.setLevel('debug'); + let gotMessage = false; + // Get the current logger Logger.setCurrentLogger(function(message, options) { if ( @@ -49,7 +51,7 @@ describe('Sharding (Read Preference)', function() { options.className === 'Cursor' && options.message.indexOf('"mode":"secondary"') !== -1 ) { - done(); + gotMessage = true; } }); @@ -59,11 +61,13 @@ describe('Sharding (Read Preference)', function() { function(err, item) { test.equal(null, err); test.equal(1, item.test); + test.ok(gotMessage); // Set error level for the driver Logger.setLevel('error'); // Close db connection client.close(); + done(); } ); }); @@ -106,6 +110,7 @@ describe('Sharding (Read Preference)', function() { // Set debug level for the driver Logger.setLevel('debug'); + let gotMessage = false; // Get the current logger Logger.setCurrentLogger(function(message, options) { if ( @@ -113,7 +118,7 @@ describe('Sharding (Read Preference)', function() { options.className === 'Cursor' && options.message.indexOf('"mode":"notsupported"') !== -1 ) { - done(); + gotMessage = true; } }); @@ -122,11 +127,13 @@ describe('Sharding (Read Preference)', function() { { readPreference: new ReadPreference('notsupported') }, function(err) { test.ok(err != null); + test.ok(gotMessage); // Set error level for the driver Logger.setLevel('error'); // Close db connection client.close(); + done(); } ); }); @@ -169,6 +176,7 @@ describe('Sharding (Read Preference)', function() { // Set debug level for the driver Logger.setLevel('debug'); + let gotMessage = false; // Get the current logger Logger.setCurrentLogger(function(message, options) { if ( @@ -178,7 +186,7 @@ describe('Sharding (Read Preference)', function() { '{"mode":"secondary","tags":[{"dc":"sf","s":"1"},{"dc":"ma","s":"2"}]}' ) !== -1 ) { - done(); + gotMessage = true; } }); @@ -192,10 +200,12 @@ describe('Sharding (Read Preference)', function() { }, function(err) { test.ok(err != null); + test.ok(gotMessage); // Set error level for the driver Logger.setLevel('error'); // Close db connection client.close(); + done(); } ); }); @@ -238,6 +248,7 @@ describe('Sharding (Read Preference)', function() { // Set debug level for the driver Logger.setLevel('debug'); + let gotMessage = false; // Get the current logger Logger.setCurrentLogger(function(message, options) { if ( @@ -246,7 +257,7 @@ describe('Sharding (Read Preference)', function() { options.message.indexOf('{"mode":"secondary","tags":[{"loc":"ny"},{"loc":"sf"}]}') !== -1 ) { - done(); + gotMessage = true; } }); @@ -261,10 +272,12 @@ describe('Sharding (Read Preference)', function() { function(err, item) { test.equal(null, err); test.equal(1, item.test); + test.ok(gotMessage); // Set error level for the driver Logger.setLevel('error'); // Close db connection client.close(); + done(); } ); }); diff --git a/test/unit/sessions/client_tests.js b/test/unit/sessions/client_tests.js index 00e371eaa29..99d9b876a36 100644 --- a/test/unit/sessions/client_tests.js +++ b/test/unit/sessions/client_tests.js @@ -60,6 +60,7 @@ describe('Sessions', function() { let session = client.startSession(); expect(session).to.exist; + session.endSession({ skipCommand: true }); client.close(); done(); }); diff --git a/test/unit/sessions/collection_tests.js b/test/unit/sessions/collection_tests.js index 298e13822d5..ebcffddebe2 100644 --- a/test/unit/sessions/collection_tests.js +++ b/test/unit/sessions/collection_tests.js @@ -44,6 +44,8 @@ describe('Sessions', function() { .then(() => { expect(findCommand.readConcern).to.have.keys(['level', 'afterClusterTime']); expect(findCommand.readConcern.afterClusterTime).to.eql(insertOperationTime); + + session.endSession({ skipCommand: true }); return client.close(); }); });