From 8b370a7ad784f5759c964cdfaec62e06c896dc95 Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Thu, 11 Mar 2021 12:24:52 -0500 Subject: [PATCH] fix: move session support check to operation layer (#2739) Monitor checks run on a timer and network errors cause a reset of the topology that would be corrected in the next cycle of the monitor, we were checking a property that gets updated by this async work. By moving the check to the operations layer we will allow users to obtain a session regardless of support and then emit an error if there is not support when the session is used in an operation. NODE-3100 --- lib/mongo_client.js | 4 - lib/operations/execute_operation.js | 116 +++++++++------------- test/functional/find.test.js | 4 +- test/functional/insert.test.js | 10 +- test/functional/operation_example.test.js | 4 +- test/unit/sessions/client.test.js | 36 ++++--- 6 files changed, 78 insertions(+), 96 deletions(-) diff --git a/lib/mongo_client.js b/lib/mongo_client.js index cc8c8a39ec..7ace024256 100644 --- a/lib/mongo_client.js +++ b/lib/mongo_client.js @@ -457,10 +457,6 @@ MongoClient.prototype.startSession = function(options) { throw new MongoError('Must connect to a server before calling this method'); } - if (!this.topology.hasSessionSupport()) { - throw new MongoError('Current topology does not support sessions'); - } - return this.topology.startSession(options, this.s.options); }; diff --git a/lib/operations/execute_operation.js b/lib/operations/execute_operation.js index 80d57857e8..c2255980d1 100644 --- a/lib/operations/execute_operation.js +++ b/lib/operations/execute_operation.js @@ -1,5 +1,6 @@ 'use strict'; +const maybePromise = require('../utils').maybePromise; const MongoError = require('../core/error').MongoError; const Aspect = require('./operation').Aspect; const OperationBase = require('./operation').OperationBase; @@ -21,7 +22,7 @@ const isUnifiedTopology = require('../core/utils').isUnifiedTopology; * @param {Operation} operation The operation to execute * @param {function} callback The command result callback */ -function executeOperation(topology, operation, callback) { +function executeOperation(topology, operation, cb) { if (topology == null) { throw new TypeError('This method requires a valid topology instance'); } @@ -30,64 +31,57 @@ function executeOperation(topology, operation, callback) { throw new TypeError('This method requires a valid operation instance'); } - if (isUnifiedTopology(topology) && topology.shouldCheckForSessionSupport()) { - return selectServerForSessionSupport(topology, operation, callback); - } - - const Promise = topology.s.promiseLibrary; - - // The driver sessions spec mandates that we implicitly create sessions for operations - // that are not explicitly provided with a session. - let session, owner; - if (topology.hasSessionSupport()) { - if (operation.session == null) { - owner = Symbol(); - session = topology.startSession({ owner }); - operation.session = session; - } else if (operation.session.hasEnded) { - throw new MongoError('Use of expired sessions is not permitted'); + return maybePromise(topology, cb, callback => { + if (isUnifiedTopology(topology) && topology.shouldCheckForSessionSupport()) { + // Recursive call to executeOperation after a server selection + return selectServerForSessionSupport(topology, operation, callback); } - } - - let result; - if (typeof callback !== 'function') { - result = new Promise((resolve, reject) => { - callback = (err, res) => { - if (err) return reject(err); - resolve(res); - }; - }); - } - function executeCallback(err, result) { - if (session && session.owner === owner) { - session.endSession(); - if (operation.session === session) { - operation.clearSession(); + // The driver sessions spec mandates that we implicitly create sessions for operations + // that are not explicitly provided with a session. + let session, owner; + if (topology.hasSessionSupport()) { + if (operation.session == null) { + owner = Symbol(); + session = topology.startSession({ owner }); + operation.session = session; + } else if (operation.session.hasEnded) { + return callback(new MongoError('Use of expired sessions is not permitted')); } + } else if (operation.session) { + // If the user passed an explicit session and we are still, after server selection, + // trying to run against a topology that doesn't support sessions we error out. + return callback(new MongoError('Current topology does not support sessions')); } - callback(err, result); - } - - try { - if (operation.hasAspect(Aspect.EXECUTE_WITH_SELECTION)) { - executeWithServerSelection(topology, operation, executeCallback); - } else { - operation.execute(executeCallback); - } - } catch (e) { - if (session && session.owner === owner) { - session.endSession(); - if (operation.session === session) { - operation.clearSession(); + function executeCallback(err, result) { + if (session && session.owner === owner) { + session.endSession(); + if (operation.session === session) { + operation.clearSession(); + } } + + callback(err, result); } - throw e; - } + try { + if (operation.hasAspect(Aspect.EXECUTE_WITH_SELECTION)) { + executeWithServerSelection(topology, operation, executeCallback); + } else { + operation.execute(executeCallback); + } + } catch (error) { + if (session && session.owner === owner) { + session.endSession(); + if (operation.session === session) { + operation.clearSession(); + } + } - return result; + callback(error); + } + }); } function supportsRetryableReads(server) { @@ -139,7 +133,6 @@ function executeWithServerSelection(topology, operation, callback) { callback(err, null); return; } - const shouldRetryReads = topology.s.options.retryReads !== false && operation.session && @@ -156,31 +149,16 @@ function executeWithServerSelection(topology, operation, callback) { }); } -// TODO: This is only supported for unified topology, it should go away once -// we remove support for legacy topology types. +// The Unified Topology runs serverSelection before executing every operation +// Session support is determined by the result of a monitoring check triggered by this selection function selectServerForSessionSupport(topology, operation, callback) { - const Promise = topology.s.promiseLibrary; - - let result; - if (typeof callback !== 'function') { - result = new Promise((resolve, reject) => { - callback = (err, result) => { - if (err) return reject(err); - resolve(result); - }; - }); - } - topology.selectServer(ReadPreference.primaryPreferred, err => { if (err) { - callback(err); - return; + return callback(err); } executeOperation(topology, operation, callback); }); - - return result; } module.exports = executeOperation; diff --git a/test/functional/find.test.js b/test/functional/find.test.js index 08454c63dd..da8d4c9564 100644 --- a/test/functional/find.test.js +++ b/test/functional/find.test.js @@ -1067,7 +1067,7 @@ describe('Find', function() { { $set: { name: 'test2' } }, {}, function(err, updated_doc) { - test.equal(null, updated_doc); + expect(updated_doc).to.not.exist; test.ok(err != null); client.close(done); } @@ -1305,7 +1305,7 @@ describe('Find', function() { { a: 10, b: 10, failIndex: 2 }, { w: 1, upsert: true }, function(err, result) { - test.equal(null, result); + expect(result).to.not.exist; test.ok(err.errmsg.match('duplicate key')); client.close(done); } diff --git a/test/functional/insert.test.js b/test/functional/insert.test.js index d560ea77f3..2bd933f39a 100644 --- a/test/functional/insert.test.js +++ b/test/functional/insert.test.js @@ -1097,8 +1097,8 @@ describe('Insert', function() { var db = client.db(configuration.db); var collection = db.collection('Should_fail_on_insert_due_to_key_starting_with'); collection.insert(doc, configuration.writeConcernMax(), function(err, result) { - test.ok(err != null); - test.equal(null, result); + expect(err).to.exist; + expect(result).to.not.exist; client.close(done); }); @@ -1348,7 +1348,7 @@ describe('Insert', function() { // Update two fields collection.insert({ _id: 1 }, configuration.writeConcernMax(), function(err, r) { - test.equal(r, null); + expect(r).to.not.exist; test.ok(err != null); test.ok(err.result); @@ -2560,7 +2560,7 @@ describe('Insert', function() { [{ a: 1 }, { a: 2 }, { a: 1 }, { a: 3 }, { a: 1 }], { ordered: true }, function(err, r) { - test.equal(r, null); + expect(r).to.not.exist; test.ok(err != null); test.ok(err.result); @@ -2601,7 +2601,7 @@ describe('Insert', function() { [{ a: 1 }, { a: 2 }, { a: 1 }, { a: 3 }, { a: 1 }], { ordered: true }, function(err, r) { - test.equal(r, null); + expect(r).to.not.exist; test.ok(err != null); test.ok(err.result); diff --git a/test/functional/operation_example.test.js b/test/functional/operation_example.test.js index 189e7ee069..34e87c5ddf 100644 --- a/test/functional/operation_example.test.js +++ b/test/functional/operation_example.test.js @@ -2657,7 +2657,7 @@ describe('Operation Examples', function() { ], { w: 1, keepGoing: true }, function(err, result) { - test.equal(result, null); + expect(result).to.not.exist; test.ok(err); test.ok(err.result); @@ -3115,7 +3115,7 @@ describe('Operation Examples', function() { // Attemp to rename the first collection to the second one, this will fail collection1.rename('test_rename_collection2', function(err, collection) { - test.equal(null, collection); + expect(collection).to.not.exist; test.ok(err instanceof Error); test.ok(err.message.length > 0); diff --git a/test/unit/sessions/client.test.js b/test/unit/sessions/client.test.js index 7d9005a816..faca91fe87 100644 --- a/test/unit/sessions/client.test.js +++ b/test/unit/sessions/client.test.js @@ -14,9 +14,9 @@ describe('Sessions', function() { }); }); - it('should throw an exception if sessions are not supported', { + it('should not throw a synchronous exception if sessions are not supported', { metadata: { requires: { topology: 'single' } }, - test: function(done) { + test() { test.server.setMessageHandler(request => { var doc = request.document; if (doc.ismaster) { @@ -27,13 +27,11 @@ describe('Sessions', function() { }); const client = this.configuration.newClient(`mongodb://${test.server.uri()}/test`); - client.connect(function(err, client) { - expect(err).to.not.exist; - expect(() => { - client.startSession(); - }).to.throw(/Current topology does not support sessions/); - - client.close(done); + return client.connect().then(() => { + expect(() => client.startSession()).to.not.throw( + 'Current topology does not support sessions' + ); + return client.close(); }); } }); @@ -42,6 +40,7 @@ describe('Sessions', function() { metadata: { requires: { topology: 'single' } }, test() { const replicaSetMock = new ReplSetFixture(); + let client; return replicaSetMock .setup({ doNotInitHandlers: true }) .then(() => { @@ -92,14 +91,23 @@ describe('Sessions', function() { return replicaSetMock.uri(); }) .then(uri => { - const client = this.configuration.newClient(uri); + client = this.configuration.newClient(uri); return client.connect(); }) .then(client => { - expect(client.topology.s.description.logicalSessionTimeoutMinutes).to.not.exist; - expect(() => { - client.startSession(); - }).to.throw(/Current topology does not support sessions/); + const session = client.startSession(); + return client + .db() + .collection('t') + .insertOne({ a: 1 }, { session }); + }) + .then(() => { + expect.fail('Expected an error to be thrown about not supporting sessions'); + }) + .catch(error => { + expect(error.message).to.equal('Current topology does not support sessions'); + }) + .then(() => { return client.close(); }); }