Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: move session support check to operation layer #2739

Merged
merged 5 commits into from
Mar 11, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor to use maybePromise, clean up comments and organize control …
…flow
  • Loading branch information
nbbeeken committed Mar 9, 2021
commit f621a498caea16014b2251aa6619fc37505447a5
108 changes: 46 additions & 62 deletions lib/operations/execute_operation.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict';

const maybePromise = require('../utils').maybePromise;
const MongoError = require('../core/error').MongoError;
const Aspect = require('./operation').Aspect;
const OperationBase = require('./operation').OperationBase;
Expand All @@ -21,7 +22,7 @@ const isUnifiedTopology = require('../core/utils').isUnifiedTopology;
* @param {Operation} operation The operation to execute
* @param {function} callback The command result callback
*/
function executeOperation(topology, operation, callback) {
function executeOperation(topology, operation, cb) {
if (topology == null) {
throw new TypeError('This method requires a valid topology instance');
}
Expand All @@ -30,74 +31,57 @@ function executeOperation(topology, operation, callback) {
throw new TypeError('This method requires a valid operation instance');
}

const Promise = topology.s.promiseLibrary;

let result;
if (typeof callback !== 'function') {
result = new Promise((resolve, reject) => {
callback = (err, res) => {
if (err) return reject(err);
resolve(res);
};
});
}

if (isUnifiedTopology(topology) && topology.shouldCheckForSessionSupport()) {
// Recursive call to executeOperation after a server selection
// shouldCheckForSessionSupport should return false after a server selection because
// there will be data bearing servers
selectServerForSessionSupport(topology, operation, callback);
return result;
}

// The driver sessions spec mandates that we implicitly create sessions for operations
// that are not explicitly provided with a session.
let session, owner;
if (topology.hasSessionSupport()) {
if (operation.session == null) {
owner = Symbol();
session = topology.startSession({ owner });
operation.session = session;
} else if (operation.session.hasEnded) {
callback(new MongoError('Use of expired sessions is not permitted'));
return result;
return maybePromise(topology, cb, callback => {
if (isUnifiedTopology(topology) && topology.shouldCheckForSessionSupport()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry to be so nitpicky, but I prefer what we do in the 4.0 branch where there's a maybePromise for each of these cases. The way this is written right now, you might end up nesting promises, whereas in 4.0 if you haven't provided a callback we are using the callback from the top maybePromise to pass into the secondary executeOperation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(No worries, please nitpick away!)
I don't see how you can nest promises here since selectServerForSessionSupport will get passed a defined callback from maybePromise and so the next call to executeOperation will have that callback defined so the "second" maybePromise won't create a new Promise.

Something to consider is that this provides one point of exit to executeOperation. Entirely an 🧅 but that pattern is preferable to what is on 4.0. It's a strange pattern to me to have more code run "synchronously" in the remainder of the function after you've returned a function that appears to do an async task. (In this instance, I know maybePromise is all sync it's just wrapping a bit of setup work, but my style point remains). I'm glossing over the callback detail by just returning but we get the idea.

function runOp() {
  if (cond)
    return doThing(() => {
      // now we've done something
      return 'one way'
    })

  // do more things
  return doThing(() => {
      // now we've done something
      return 'the other way'
  })
}

vs.

function runOp() {
  return doThing(() => {
    if (cond) return 'one way'
    
    // do more things
    return 'the other way'
  })
}

Copy link
Member

@mbroadst mbroadst Mar 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, you're right there's no Promise nesting. Maybe my ultimate concern is just about the multiple maybePromise calls in the executeOperation stacktrace. A call to insertOne right now will look like:

  • executeOperation
  • maybePromise
  • selectServerForSessionSupport
  • executeOperation
  • maybePromise
  • ....

I guess we could provide a further abstraction of executeOperation (executeOperationImpl or something) where you could skip one of the maybePromise, but maybe it's not so much of a concern here. It's unlikely that selectServerForSessionSupport is called many times, so this isn't a big problem.

Ok! Sounds good to me, maybe you want to make a similar change to the 4.0 branch while you're here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea we'll need to make a port of this sessions related fix to 4.0 so I can do that when I'm there (If anyone else reviewing has a differing opinion please feel free to chime in!)

// Recursive call to executeOperation after a server selection
return selectServerForSessionSupport(topology, operation, callback);
}
} else if (operation.session && !topology.hasSessionSupport()) {
// If the user passed an explicit session and we are still, after server selection,
// trying to run against a topology that doesn't support sessions we error out.
callback(new MongoError('Current topology does not support sessions'));
return result;
}

function executeCallback(err, result) {
if (session && session.owner === owner) {
session.endSession();
if (operation.session === session) {
operation.clearSession();
// The driver sessions spec mandates that we implicitly create sessions for operations
// that are not explicitly provided with a session.
let session, owner;
if (topology.hasSessionSupport()) {
if (operation.session == null) {
owner = Symbol();
session = topology.startSession({ owner });
operation.session = session;
} else if (operation.session.hasEnded) {
return callback(new MongoError('Use of expired sessions is not permitted'));
}
} else if (operation.session) {
// If the user passed an explicit session and we are still, after server selection,
// trying to run against a topology that doesn't support sessions we error out.
return callback(new MongoError('Current topology does not support sessions'));
}

callback(err, result);
}

try {
if (operation.hasAspect(Aspect.EXECUTE_WITH_SELECTION)) {
executeWithServerSelection(topology, operation, executeCallback);
} else {
operation.execute(executeCallback);
}
} catch (error) {
if (session && session.owner === owner) {
session.endSession();
if (operation.session === session) {
operation.clearSession();
function executeCallback(err, result) {
if (session && session.owner === owner) {
session.endSession();
if (operation.session === session) {
operation.clearSession();
}
}

callback(err, result);
}

callback(error);
}
try {
if (operation.hasAspect(Aspect.EXECUTE_WITH_SELECTION)) {
executeWithServerSelection(topology, operation, executeCallback);
} else {
operation.execute(executeCallback);
}
} catch (error) {
if (session && session.owner === owner) {
session.endSession();
if (operation.session === session) {
operation.clearSession();
}
}

return result;
callback(error);
}
});
}

function supportsRetryableReads(server) {
Expand Down Expand Up @@ -165,8 +149,8 @@ function executeWithServerSelection(topology, operation, callback) {
});
}

// TODO: This is only supported for unified topology, it should go away once
// we remove support for legacy topology types.
// The Unified Topology runs serverSelection before executing every operation
// Session support is determined by the result of a monitoring check triggered by this selection
function selectServerForSessionSupport(topology, operation, callback) {
topology.selectServer(ReadPreference.primaryPreferred, err => {
if (err) {
Expand Down
26 changes: 7 additions & 19 deletions test/unit/sessions/client.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ describe('Sessions', function() {
});
});

it('should throw an exception if sessions are not supported', {
it('should not throw a synchronous exception if sessions are not supported', {
metadata: { requires: { topology: 'single' } },
test() {
test.server.setMessageHandler(request => {
Expand All @@ -27,24 +27,12 @@ describe('Sessions', function() {
});

const client = this.configuration.newClient(`mongodb://${test.server.uri()}/test`);
return client
.connect()
.then(function(client) {
const session = client.startSession();
return client
.db()
.collection('t')
.insertOne({ a: 1 }, { session });
})
.then(() => {
expect.fail('Expected an error to be thrown about not supporting sessions');
})
.catch(error => {
expect(error.message).to.equal('Current topology does not support sessions');
})
.then(() => {
return client.close();
});
return client.connect().then(() => {
expect(() => client.startSession()).to.not.throw(
'Current topology does not support sessions'
);
return client.close();
});
}
});

Expand Down