Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: move session support check to operation layer #2739

Merged
merged 5 commits into from
Mar 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions lib/mongo_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,6 @@ MongoClient.prototype.startSession = function(options) {
throw new MongoError('Must connect to a server before calling this method');
}

if (!this.topology.hasSessionSupport()) {
throw new MongoError('Current topology does not support sessions');
}

return this.topology.startSession(options, this.s.options);
};

Expand Down
116 changes: 47 additions & 69 deletions lib/operations/execute_operation.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict';

const maybePromise = require('../utils').maybePromise;
const MongoError = require('../core/error').MongoError;
const Aspect = require('./operation').Aspect;
const OperationBase = require('./operation').OperationBase;
Expand All @@ -21,7 +22,7 @@ const isUnifiedTopology = require('../core/utils').isUnifiedTopology;
* @param {Operation} operation The operation to execute
* @param {function} callback The command result callback
*/
function executeOperation(topology, operation, callback) {
function executeOperation(topology, operation, cb) {
if (topology == null) {
throw new TypeError('This method requires a valid topology instance');
}
Expand All @@ -30,64 +31,57 @@ function executeOperation(topology, operation, callback) {
throw new TypeError('This method requires a valid operation instance');
}

if (isUnifiedTopology(topology) && topology.shouldCheckForSessionSupport()) {
return selectServerForSessionSupport(topology, operation, callback);
}

const Promise = topology.s.promiseLibrary;

// The driver sessions spec mandates that we implicitly create sessions for operations
// that are not explicitly provided with a session.
let session, owner;
if (topology.hasSessionSupport()) {
if (operation.session == null) {
owner = Symbol();
session = topology.startSession({ owner });
operation.session = session;
} else if (operation.session.hasEnded) {
throw new MongoError('Use of expired sessions is not permitted');
return maybePromise(topology, cb, callback => {
if (isUnifiedTopology(topology) && topology.shouldCheckForSessionSupport()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry to be so nitpicky, but I prefer what we do in the 4.0 branch where there's a maybePromise for each of these cases. The way this is written right now, you might end up nesting promises, whereas in 4.0 if you haven't provided a callback we are using the callback from the top maybePromise to pass into the secondary executeOperation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(No worries, please nitpick away!)
I don't see how you can nest promises here since selectServerForSessionSupport will get passed a defined callback from maybePromise and so the next call to executeOperation will have that callback defined so the "second" maybePromise won't create a new Promise.

Something to consider is that this provides one point of exit to executeOperation. Entirely an 🧅 but that pattern is preferable to what is on 4.0. It's a strange pattern to me to have more code run "synchronously" in the remainder of the function after you've returned a function that appears to do an async task. (In this instance, I know maybePromise is all sync it's just wrapping a bit of setup work, but my style point remains). I'm glossing over the callback detail by just returning but we get the idea.

function runOp() {
  if (cond)
    return doThing(() => {
      // now we've done something
      return 'one way'
    })

  // do more things
  return doThing(() => {
      // now we've done something
      return 'the other way'
  })
}

vs.

function runOp() {
  return doThing(() => {
    if (cond) return 'one way'
    
    // do more things
    return 'the other way'
  })
}

Copy link
Member

@mbroadst mbroadst Mar 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, you're right there's no Promise nesting. Maybe my ultimate concern is just about the multiple maybePromise calls in the executeOperation stacktrace. A call to insertOne right now will look like:

  • executeOperation
  • maybePromise
  • selectServerForSessionSupport
  • executeOperation
  • maybePromise
  • ....

I guess we could provide a further abstraction of executeOperation (executeOperationImpl or something) where you could skip one of the maybePromise, but maybe it's not so much of a concern here. It's unlikely that selectServerForSessionSupport is called many times, so this isn't a big problem.

Ok! Sounds good to me, maybe you want to make a similar change to the 4.0 branch while you're here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea we'll need to make a port of this sessions related fix to 4.0 so I can do that when I'm there (If anyone else reviewing has a differing opinion please feel free to chime in!)

// Recursive call to executeOperation after a server selection
return selectServerForSessionSupport(topology, operation, callback);
}
}

let result;
if (typeof callback !== 'function') {
result = new Promise((resolve, reject) => {
callback = (err, res) => {
if (err) return reject(err);
resolve(res);
};
});
}

function executeCallback(err, result) {
if (session && session.owner === owner) {
session.endSession();
if (operation.session === session) {
operation.clearSession();
// The driver sessions spec mandates that we implicitly create sessions for operations
// that are not explicitly provided with a session.
let session, owner;
if (topology.hasSessionSupport()) {
if (operation.session == null) {
owner = Symbol();
session = topology.startSession({ owner });
operation.session = session;
} else if (operation.session.hasEnded) {
return callback(new MongoError('Use of expired sessions is not permitted'));
}
} else if (operation.session) {
// If the user passed an explicit session and we are still, after server selection,
// trying to run against a topology that doesn't support sessions we error out.
return callback(new MongoError('Current topology does not support sessions'));
}

callback(err, result);
}

try {
if (operation.hasAspect(Aspect.EXECUTE_WITH_SELECTION)) {
executeWithServerSelection(topology, operation, executeCallback);
} else {
operation.execute(executeCallback);
}
} catch (e) {
if (session && session.owner === owner) {
session.endSession();
if (operation.session === session) {
operation.clearSession();
function executeCallback(err, result) {
if (session && session.owner === owner) {
session.endSession();
if (operation.session === session) {
operation.clearSession();
}
}

callback(err, result);
}

throw e;
}
try {
if (operation.hasAspect(Aspect.EXECUTE_WITH_SELECTION)) {
executeWithServerSelection(topology, operation, executeCallback);
} else {
operation.execute(executeCallback);
}
} catch (error) {
if (session && session.owner === owner) {
session.endSession();
if (operation.session === session) {
operation.clearSession();
}
}

return result;
callback(error);
}
});
}

function supportsRetryableReads(server) {
Expand Down Expand Up @@ -139,7 +133,6 @@ function executeWithServerSelection(topology, operation, callback) {
callback(err, null);
return;
}

const shouldRetryReads =
topology.s.options.retryReads !== false &&
operation.session &&
Expand All @@ -156,31 +149,16 @@ function executeWithServerSelection(topology, operation, callback) {
});
}

// TODO: This is only supported for unified topology, it should go away once
// we remove support for legacy topology types.
// The Unified Topology runs serverSelection before executing every operation
// Session support is determined by the result of a monitoring check triggered by this selection
function selectServerForSessionSupport(topology, operation, callback) {
const Promise = topology.s.promiseLibrary;

let result;
if (typeof callback !== 'function') {
result = new Promise((resolve, reject) => {
callback = (err, result) => {
if (err) return reject(err);
resolve(result);
};
});
}

topology.selectServer(ReadPreference.primaryPreferred, err => {
if (err) {
callback(err);
return;
return callback(err);
}

executeOperation(topology, operation, callback);
});

return result;
}

module.exports = executeOperation;
4 changes: 2 additions & 2 deletions test/functional/find.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1067,7 +1067,7 @@ describe('Find', function() {
{ $set: { name: 'test2' } },
{},
function(err, updated_doc) {
test.equal(null, updated_doc);
expect(updated_doc).to.not.exist;
test.ok(err != null);
client.close(done);
}
Expand Down Expand Up @@ -1305,7 +1305,7 @@ describe('Find', function() {
{ a: 10, b: 10, failIndex: 2 },
{ w: 1, upsert: true },
function(err, result) {
test.equal(null, result);
expect(result).to.not.exist;
test.ok(err.errmsg.match('duplicate key'));
client.close(done);
}
Expand Down
10 changes: 5 additions & 5 deletions test/functional/insert.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1097,8 +1097,8 @@ describe('Insert', function() {
var db = client.db(configuration.db);
var collection = db.collection('Should_fail_on_insert_due_to_key_starting_with');
collection.insert(doc, configuration.writeConcernMax(), function(err, result) {
test.ok(err != null);
test.equal(null, result);
expect(err).to.exist;
expect(result).to.not.exist;

client.close(done);
});
Expand Down Expand Up @@ -1348,7 +1348,7 @@ describe('Insert', function() {

// Update two fields
collection.insert({ _id: 1 }, configuration.writeConcernMax(), function(err, r) {
test.equal(r, null);
expect(r).to.not.exist;
test.ok(err != null);
test.ok(err.result);

Expand Down Expand Up @@ -2560,7 +2560,7 @@ describe('Insert', function() {
[{ a: 1 }, { a: 2 }, { a: 1 }, { a: 3 }, { a: 1 }],
{ ordered: true },
function(err, r) {
test.equal(r, null);
expect(r).to.not.exist;
test.ok(err != null);
test.ok(err.result);

Expand Down Expand Up @@ -2601,7 +2601,7 @@ describe('Insert', function() {
[{ a: 1 }, { a: 2 }, { a: 1 }, { a: 3 }, { a: 1 }],
{ ordered: true },
function(err, r) {
test.equal(r, null);
expect(r).to.not.exist;
test.ok(err != null);
test.ok(err.result);

Expand Down
4 changes: 2 additions & 2 deletions test/functional/operation_example.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2657,7 +2657,7 @@ describe('Operation Examples', function() {
],
{ w: 1, keepGoing: true },
function(err, result) {
test.equal(result, null);
expect(result).to.not.exist;
test.ok(err);
test.ok(err.result);

Expand Down Expand Up @@ -3115,7 +3115,7 @@ describe('Operation Examples', function() {

// Attemp to rename the first collection to the second one, this will fail
collection1.rename('test_rename_collection2', function(err, collection) {
test.equal(null, collection);
expect(collection).to.not.exist;
test.ok(err instanceof Error);
test.ok(err.message.length > 0);

Expand Down
36 changes: 22 additions & 14 deletions test/unit/sessions/client.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ describe('Sessions', function() {
});
});

it('should throw an exception if sessions are not supported', {
it('should not throw a synchronous exception if sessions are not supported', {
metadata: { requires: { topology: 'single' } },
test: function(done) {
test() {
test.server.setMessageHandler(request => {
var doc = request.document;
if (doc.ismaster) {
Expand All @@ -27,13 +27,11 @@ describe('Sessions', function() {
});

const client = this.configuration.newClient(`mongodb://${test.server.uri()}/test`);
client.connect(function(err, client) {
expect(err).to.not.exist;
expect(() => {
client.startSession();
}).to.throw(/Current topology does not support sessions/);

client.close(done);
return client.connect().then(() => {
expect(() => client.startSession()).to.not.throw(
'Current topology does not support sessions'
);
return client.close();
});
}
});
Expand All @@ -42,6 +40,7 @@ describe('Sessions', function() {
metadata: { requires: { topology: 'single' } },
test() {
const replicaSetMock = new ReplSetFixture();
let client;
return replicaSetMock
.setup({ doNotInitHandlers: true })
.then(() => {
Expand Down Expand Up @@ -92,14 +91,23 @@ describe('Sessions', function() {
return replicaSetMock.uri();
})
.then(uri => {
const client = this.configuration.newClient(uri);
client = this.configuration.newClient(uri);
return client.connect();
})
.then(client => {
expect(client.topology.s.description.logicalSessionTimeoutMinutes).to.not.exist;
expect(() => {
client.startSession();
}).to.throw(/Current topology does not support sessions/);
const session = client.startSession();
return client
.db()
.collection('t')
.insertOne({ a: 1 }, { session });
})
.then(() => {
expect.fail('Expected an error to be thrown about not supporting sessions');
})
.catch(error => {
expect(error.message).to.equal('Current topology does not support sessions');
})
.then(() => {
return client.close();
});
}
Expand Down