Skip to content

Commit

Permalink
feat(read-preference): unify means of read preference resolution (#1738)
Browse files Browse the repository at this point in the history
* feat(read-preference): unify means of read preference resolution

NODE-1515
  • Loading branch information
mbroadst authored Jun 18, 2018
1 parent fdb828b commit 2995e11
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 76 deletions.
13 changes: 8 additions & 5 deletions lib/collection.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const ordered = require('./bulk/ordered');
const ChangeStream = require('./change_stream');
const executeOperation = require('./utils').executeOperation;
const applyWriteConcern = require('./utils').applyWriteConcern;
const getReadPreference = require('./utils').getReadPreference;
const resolveReadPreference = require('./utils').resolveReadPreference;

// Operations
const bulkWrite = require('./operations/collection_ops').bulkWrite;
Expand Down Expand Up @@ -340,7 +340,10 @@ Collection.prototype.find = function(query, options, callback) {
newOptions.slaveOk = options.slaveOk != null ? options.slaveOk : this.s.db.slaveOk;

// Add read preference if needed
newOptions = getReadPreference(this, newOptions, this.s.db);
newOptions.readPreference = resolveReadPreference(newOptions, {
db: this.s.db,
collection: this
});

// Set slave ok to true if read preference different from primary
if (
Expand Down Expand Up @@ -1223,7 +1226,7 @@ Collection.prototype.listIndexes = function(options) {
// Clone the options
options = Object.assign({}, options);
// Determine the read preference in the options.
options = getReadPreference(this, options, this.s.db, this);
options.readPreference = resolveReadPreference(options, { db: this.s.db, collection: this });
// Set the CommandCursor constructor
options.cursorFactory = CommandCursor;
// Set the promiseLibrary
Expand Down Expand Up @@ -1738,7 +1741,7 @@ Collection.prototype.aggregate = function(pipeline, options, callback) {

options = Object.assign({}, options);
// Ensure we have the right read preference inheritance
options = getReadPreference(this, options, this.s.db, this);
options.readPreference = resolveReadPreference(options, { db: this.s.db, collection: this });

// If explain has been specified add it
if (options.explain) {
Expand Down Expand Up @@ -1836,7 +1839,7 @@ Collection.prototype.parallelCollectionScan = function(options, callback) {

options = Object.assign({}, options);
// Ensure we have the right read preference inheritance
options = getReadPreference(this, options, this.s.db, this);
options.readPreference = resolveReadPreference(options, { db: this.s.db, collection: this });

// Add a promiseLibrary
options.promiseLibrary = this.s.promiseLibrary;
Expand Down
17 changes: 6 additions & 11 deletions lib/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const Collection = require('./collection');
const mergeOptionsAndWriteConcern = require('./utils').mergeOptionsAndWriteConcern;
const executeOperation = require('./utils').executeOperation;
const applyWriteConcern = require('./utils').applyWriteConcern;
const convertReadPreference = require('./utils').convertReadPreference;
const resolveReadPreference = require('./utils').resolveReadPreference;
const ChangeStream = require('./change_stream');

// Operations
Expand Down Expand Up @@ -477,11 +477,10 @@ Db.prototype.listCollections = function(filter, options) {
options.promiseLibrary = this.s.promiseLibrary;

// Ensure valid readPreference
if (options.readPreference) {
options.readPreference = convertReadPreference(options.readPreference);
} else {
options.readPreference = this.s.readPreference || ReadPreference.primary;
}
options.readPreference = resolveReadPreference(options, {
db: this,
default: ReadPreference.primary
});

// Cursor options
let cursor = options.batchSize ? { batchSize: options.batchSize } : {};
Expand Down Expand Up @@ -673,11 +672,7 @@ Db.prototype.collections = function(options, callback) {
Db.prototype.executeDbAdminCommand = function(selector, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};

// Convert read preference
if (options.readPreference) {
options.readPreference = convertReadPreference(options.readPreference);
}
options.readPreference = resolveReadPreference(options);

return executeOperation(this.s.topology, executeDbAdminCommand, [
this,
Expand Down
14 changes: 7 additions & 7 deletions lib/operations/collection_ops.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const evaluate = require('./db_ops').evaluate;
const executeCommand = require('./db_ops').executeCommand;
const executeDbAdminCommand = require('./db_ops').executeDbAdminCommand;
const formattedOrderClause = require('../utils').formattedOrderClause;
const getReadPreference = require('../utils').getReadPreference;
const resolveReadPreference = require('../utils').resolveReadPreference;
const handleCallback = require('../utils').handleCallback;
const indexInformationDb = require('./db_ops').indexInformation;
const isObject = require('../utils').isObject;
Expand Down Expand Up @@ -195,7 +195,7 @@ function count(coll, query, options, callback) {
if (hint) cmd.hint = hint;

// Ensure we have the right read preference inheritance
options = getReadPreference(coll, options, coll.s.db);
options.readPreference = resolveReadPreference(options, { db: coll.s.db, collection: coll });

// Do we have a readConcern specified
decorateWithReadConcern(cmd, coll, options);
Expand Down Expand Up @@ -359,7 +359,7 @@ function distinct(coll, key, query, options, callback) {

options = Object.assign({}, options);
// Ensure we have the right read preference inheritance
options = getReadPreference(coll, options, coll.s.db, coll);
options.readPreference = resolveReadPreference(options, { db: coll.s.db, collection: coll });

// Add maxTimeMS if defined
if (typeof maxTimeMS === 'number') cmd.maxTimeMS = maxTimeMS;
Expand Down Expand Up @@ -634,7 +634,7 @@ function geoHaystackSearch(coll, x, y, options, callback) {

options = Object.assign({}, options);
// Ensure we have the right read preference inheritance
options = getReadPreference(coll, options, coll.s.db, coll);
options.readPreference = resolveReadPreference(options, { db: coll.s.db, collection: coll });

// Do we have a readConcern specified
decorateWithReadConcern(commandObject, coll, options);
Expand Down Expand Up @@ -694,7 +694,7 @@ function group(coll, keys, condition, initial, reduce, finalize, command, option

options = Object.assign({}, options);
// Ensure we have the right read preference inheritance
options = getReadPreference(coll, options, coll.s.db, coll);
options.readPreference = resolveReadPreference(options, { db: coll.s.db, collection: coll });

// Do we have a readConcern specified
decorateWithReadConcern(selector, coll, options);
Expand Down Expand Up @@ -891,7 +891,7 @@ function mapReduce(coll, map, reduce, options, callback) {
options = Object.assign({}, options);

// Ensure we have the right read preference inheritance
options = getReadPreference(coll, options, coll.s.db, coll);
options.readPreference = resolveReadPreference(options, { db: coll.s.db, collection: coll });

// If we have a read preference and inline is not set as output fail hard
if (
Expand Down Expand Up @@ -1280,7 +1280,7 @@ function stats(coll, options, callback) {

options = Object.assign({}, options);
// Ensure we have the right read preference inheritance
options = getReadPreference(coll, options, coll.s.db, coll);
options.readPreference = resolveReadPreference(options, { db: coll.s.db, collection: coll });

// Execute the command
executeCommand(coll.s.db, commandObject, options, callback);
Expand Down
8 changes: 2 additions & 6 deletions lib/operations/db_ops.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const applyWriteConcern = require('../utils').applyWriteConcern;
const Code = require('mongodb-core').BSON.Code;
const convertReadPreference = require('../utils').convertReadPreference;
const resolveReadPreference = require('../utils').resolveReadPreference;
const crypto = require('crypto');
const Db = require('../db');
const debugOptions = require('../utils').debugOptions;
Expand Down Expand Up @@ -482,11 +482,7 @@ function executeCommand(db, command, options, callback) {
}

// Convert the readPreference if its not a write
if (options.readPreference) {
options.readPreference = convertReadPreference(options.readPreference);
} else {
options.readPreference = ReadPreference.primary;
}
options.readPreference = resolveReadPreference(options, { default: ReadPreference.primary });

// Debug information
if (db.s.logger.isDebug())
Expand Down
90 changes: 43 additions & 47 deletions lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -496,60 +496,57 @@ function applyWriteConcern(target, sources, options) {
}

/**
* Ensures provided read preference is properly converted into an object
* @param {(ReadPreference|string|object)} readPreference the user provided read preference
* @return {ReadPreference}
* Resolves a read preference based on well-defined inheritance rules. This method will not only
* determine the read preference (if there is one), but will also ensure the returned value is a
* properly constructed instance of `ReadPreference`.
*
* @param {Object} options The options passed into the method, potentially containing a read preference
* @param {Object} sources Sources from which we can inherit a read preference
* @returns {(ReadPreference|null)} The resolved read preference
*/
function convertReadPreference(readPreference) {
if (readPreference) {
if (typeof readPreference === 'string') {
return new ReadPreference(readPreference);
} else if (
readPreference &&
!(readPreference instanceof ReadPreference) &&
typeof readPreference === 'object'
) {
const mode = readPreference.mode || readPreference.preference;
if (mode && typeof mode === 'string') {
return new ReadPreference(mode, readPreference.tags, {
maxStalenessSeconds: readPreference.maxStalenessSeconds
});
}
} else if (!(readPreference instanceof ReadPreference)) {
throw new TypeError('Invalid read preference: ' + readPreference);
}
}
function resolveReadPreference(options, sources) {
options = options || {};
sources = sources || {};

return readPreference;
}
const db = sources.db;
const coll = sources.collection;
const defaultReadPreference = sources.default;

// Figure out the read preference
function getReadPreference(coll, options, db) {
let r = null;
let readPreference;
if (options.readPreference) {
r = options.readPreference;
} else if (coll.s.readPreference) {
r = coll.s.readPreference;
} else if (db.s.readPreference) {
r = db.s.readPreference;
} else {
return options;
}

if (typeof r === 'string') {
options.readPreference = new ReadPreference(r);
} else if (r && !(r instanceof ReadPreference) && typeof r === 'object') {
const mode = r.mode || r.preference;
readPreference = options.readPreference;
} else if (coll && coll.s.readPreference) {
readPreference = coll.s.readPreference;
} else if (db && db.s.readPreference) {
readPreference = db.s.readPreference;
} else if (defaultReadPreference) {
readPreference = defaultReadPreference;
}

// do we even have a read preference?
if (readPreference == null) {
return null;
}

// now attempt to convert the read preference if necessary
if (typeof readPreference === 'string') {
readPreference = new ReadPreference(readPreference);
} else if (
readPreference &&
!(readPreference instanceof ReadPreference) &&
typeof readPreference === 'object'
) {
const mode = readPreference.mode || readPreference.preference;
if (mode && typeof mode === 'string') {
options.readPreference = new ReadPreference(mode, r.tags, {
maxStalenessSeconds: r.maxStalenessSeconds
readPreference = new ReadPreference(mode, readPreference.tags, {
maxStalenessSeconds: readPreference.maxStalenessSeconds
});
}
} else if (!(r instanceof ReadPreference)) {
throw new TypeError('Invalid read preference: ' + r);
} else if (!(readPreference instanceof ReadPreference)) {
throw new TypeError('Invalid read preference: ' + readPreference);
}

return options;
return readPreference;
}

/**
Expand Down Expand Up @@ -625,9 +622,8 @@ module.exports = {
translateReadPreference,
executeOperation,
applyWriteConcern,
convertReadPreference,
resolveReadPreference,
isPromiseLike,
getReadPreference,
decorateWithCollation,
decorateWithReadConcern
};

0 comments on commit 2995e11

Please sign in to comment.