Skip to content

Commit

Permalink
Fixed the HA to run on it's own connection instead of using the exist…
Browse files Browse the repository at this point in the history
…ing connections so any problem in HA will not cause a driver ripple effect
  • Loading branch information
christkv committed Nov 6, 2012
1 parent 6113325 commit 0fb3a45
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 134 deletions.
1 change: 1 addition & 0 deletions HISTORY
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- Force correct setting of read_secondary based on the read preference (Issue #741)
- If using read preferences with secondaries queries will not fail if primary is down (Issue #744)
- noOpen connection for Db.connect removed as not compatible with autodetection of Mongo type
- Mongos connection with auth not working (Issue #737)

1.1.11 2012-10-10
-----------------
Expand Down
119 changes: 53 additions & 66 deletions lib/mongodb/connection/repl_set.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ var ReplSet = exports.ReplSet = function(servers, options) {
this._numberOfServersLeftToInitialize = 0;
// Do we record server stats or not
this.recordQueryStats = false;
// Update health try server
this.updateHealthServerTry = 0;

// Get the readPreference
var readPreference = this.options['readPreference'];
Expand Down Expand Up @@ -251,23 +253,6 @@ ReplSet.prototype.isPrimary = function(config) {
*/
ReplSet.prototype.isReadPrimary = ReplSet.prototype.isPrimary;

/**
* @ignore
**/
ReplSet.prototype._checkReplicaSet = function() {
if(!this.haEnabled) return false;
var currentTime = new Date().getTime();

if (this.__isDoingReplSetHealth) return false;

if((currentTime - this.lastReplicaSetTime) >= this.replicasetStatusCheckInterval) {
this.lastReplicaSetTime = currentTime;
return true;
} else {
return false;
}
}

/**
* @ignore
*/
Expand Down Expand Up @@ -304,43 +289,73 @@ ReplSet.prototype.allServerInstances = function() {
}

/**
* Refresh the health status / configuration of this replicaset.
* Detect failover, addition of new nodes, etc.
* Enables high availability pings.
*
* @param {Db} db
* @ignore
*/
ReplSet.prototype.updateHealth = function(db, callback) {
if(!this._checkReplicaSet()) {
return callback(null, null);
}

ReplSet.prototype._enableHA = function () {
var self = this;
self.__isDoingReplSetHealth = true;
return check();

function ping () {
if("disconnected" == self._serverState) return;

if(Object.keys(self._state.addresses).length == 0) return;
var selectedServer = self._state.addresses[Object.keys(self._state.addresses)[self.updateHealthServerTry++]];
if(self.updateHealthServerTry >= Object.keys(self._state.addresses).length) self.updateHealthServerTry = 0;
if(selectedServer == null) return check();

// If we have an active db instance
if(self.dbInstances.length > 0) {
var db = self.dbInstances[0];

// Create a new master connection
var _server = new Server(selectedServer.host, selectedServer.port, {
auto_reconnect: false,
returnIsMasterResults: true,
slaveOk: true,
socketOptions: { connectTimeoutMS: 1000}
});

var cmd = DbCommand.createIsMasterCommand(db);
db._executeQueryCommand(cmd, {failFast:true}, function(err, res) {
if(err) {
self.__isDoingReplSetHealth = false;
return callback(err, null);
// Connect using the new _server connection to not impact the driver
// behavior on any errors we could possibly run into
_server.connect(db, function(err, result, _server) {
if(err) {
if(_server.close) _server.close();
return check();
}

// Create is master command
var cmd = DbCommand.createIsMasterCommand(db);
// Execute is master command
db._executeQueryCommand(cmd, {failFast:true, connection: _server.checkoutReader()}, function(err, res) {
// Close the connection used
_server.close();
// If error let's set perform another check
if(err) return check();
// Validate the replicaset
self._validateReplicaset(res, db.auths, function() {
check();
});
});
});
}
}

self._validateReplicaset(res, db.auths, function() {
self.__isDoingReplSetHealth = false;
callback(null, null);
});
})
function check () {
self._haTimer = setTimeout(ping, self.replicasetStatusCheckInterval);
}
}

/**
* @ignore
*/
ReplSet.prototype._validateReplicaset = function(result, auths, cb) {
var self = this;

var res = result.documents[0];

// manage master node changes
if (res.primary && self._state.master.name != res.primary) {
if(res.primary && self._state.master && self._state.master.name != res.primary) {
// Delete master record so we can rediscover it
delete self._state.addresses[self._state.master.name];

Expand Down Expand Up @@ -801,34 +816,6 @@ ReplSet.prototype._handleOnFullSetup = function (parent) {
this._enableHA();
}

/**
* Enables high availability pings.
*
* @ignore
*/
ReplSet.prototype._enableHA = function () {
var self = this;
return check();

function check () {
self._haTimer = setTimeout(ping, self.replicasetStatusCheckInterval);
}

function ping () {
// TODO do we ever become reconnected?
// if so need to solve restarting ping
if("disconnected" == self._serverState) return;

// If we are connected let's perform a healthcheck
if("connected" == self._serverState
&& Array.isArray(self.dbInstances) && self.dbInstances.length > 0) {
self.updateHealth(self.dbInstances[0], check);
} else {
check();
}
}
}

/**
* Disables high availability pings.
*
Expand Down
8 changes: 0 additions & 8 deletions lib/mongodb/connection/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -338,18 +338,13 @@ Server.prototype.connect = function(dbInstance, options, callback) {
// Only execute callback if we have a caller
// chained is for findAndModify as it does not respect write concerns
if(callbackInfo && callbackInfo.callback && callbackInfo.info && Array.isArray(callbackInfo.info.chained)) {
// console.log("============================ chained")
// console.dir(callbackInfo)
// Check if callback has already been fired (missing chain command)
var chained = callbackInfo.info.chained;
var numberOfFoundCallbacks = 0;
for(var i = 0; i < chained.length; i++) {
if(dbInstanceObject._hasHandler(chained[i])) numberOfFoundCallbacks++;
}

// console.log("numberOfFoundCallbacks :: " + numberOfFoundCallbacks)
// console.log("chained :: " + chained.length)

// If we have already fired then clean up rest of chain and move on
if(numberOfFoundCallbacks != chained.length) {
for(var i = 0; i < chained.length; i++) {
Expand Down Expand Up @@ -401,9 +396,6 @@ Server.prototype.connect = function(dbInstance, options, callback) {
// chained commands
var firstResult = mongoReply && mongoReply.documents;

// console.log("====================================================== parse")
// console.dir(firstResult)

// Check for an error, if we have one let's trigger the callback and clean up
// The chained callbacks
if(firstResult[0].err != null || firstResult[0].errmsg != null) {
Expand Down
1 change: 0 additions & 1 deletion lib/mongodb/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -2038,7 +2038,6 @@ var _finishConnecting = function(serverConfig, object, options, callback) {
// Safe settings
var safe = {};
// Build the safe paramter if needed
// console.dir(object)
if(object.db_options.journal) safe.j = object.db_options.journal;
if(object.db_options.w) safe.w = object.db_options.w;
if(object.db_options.fsync) safe.fsync = object.db_options.fsync;
Expand Down
26 changes: 13 additions & 13 deletions lib/mongodb/gridfs/gridstore.js
Original file line number Diff line number Diff line change
Expand Up @@ -440,19 +440,19 @@ var writeBuffer = function(self, buffer, close, callback) {
* @api private
*/
var buildMongoObject = function(self, callback) {
// Keeps the final chunk number
var chunkNumber = 0;
var previousChunkSize = 0;
// Get the correct chunk Number, if we have an empty chunk return the previous chunk number
if(null != self.currentChunk && self.currentChunk.chunkNumber > 0 && self.currentChunk.position == 0) {
chunkNumber = self.currentChunk.chunkNumber - 1;
} else {
chunkNumber = self.currentChunk.chunkNumber;
previousChunkSize = self.currentChunk.position;
}

// Calcuate the length
var length = self.currentChunk != null ? (chunkNumber * self.chunkSize + previousChunkSize) : 0;
// // Keeps the final chunk number
// var chunkNumber = 0;
// var previousChunkSize = 0;
// // Get the correct chunk Number, if we have an empty chunk return the previous chunk number
// if(null != self.currentChunk && self.currentChunk.chunkNumber > 0 && self.currentChunk.position == 0) {
// chunkNumber = self.currentChunk.chunkNumber - 1;
// } else {
// chunkNumber = self.currentChunk.chunkNumber;
// previousChunkSize = self.currentChunk.position;
// }

// // Calcuate the length
// var length = self.currentChunk != null ? (chunkNumber * self.chunkSize + previousChunkSize) : 0;
var mongoObject = {
'_id': self.fileId,
'filename': self.filename,
Expand Down
82 changes: 82 additions & 0 deletions test/auxilliary/replicaset_auth_connection_handling_test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
var mongodb = process.env['TEST_NATIVE'] != null ? require('../../lib/mongodb').native() : require('../../lib/mongodb').pure();

var testCase = require('nodeunit').testCase,
debug = require('util').debug,
inspect = require('util').inspect,
nodeunit = require('nodeunit'),
gleak = require('../../dev/tools/gleak'),
Db = mongodb.Db,
Cursor = mongodb.Cursor,
Collection = mongodb.Collection,
Server = mongodb.Server,
ReadPreference = mongodb.ReadPreference,
ReplSetServers = mongodb.ReplSetServers,
ReplicaSetManager = require('../../test/tools/replica_set_manager').ReplicaSetManager,
Step = require("step");

var MONGODB = 'integration_tests';
var serverManager = null;
var RS = RS == null ? null : RS;

/**
* Retrieve the server information for the current
* instance of the db client
*
* @ignore
*/
exports.setUp = function(callback) {
RS = new ReplicaSetManager({retries:120,
// auth:true,
journal:true,
arbiter_count:0,
secondary_count:2,
passive_count:0});
RS.startSet(true, function(err, result) {
if(err != null) throw err;
// Finish setup
callback();
});
}

/**
* Retrieve the server information for the current
* instance of the db client
*
* @ignore
*/
exports.tearDown = function(callback) {
callback();
}

exports['Should correctly handle replicaset master stepdown and stepup without loosing auth'] = function(test) {
var replSet = new ReplSetServers( [
new Server( 'localhost', 30000),
new Server( 'localhost', 30001)
],
{rs_name:"replica-set-foo", poolSize:1}
);

// Connect
new Db('replicaset_test_auth', replSet, {safe:false}).open(function(err, db) {
// Just set auths for the manager to handle it correctly
RS.setAuths("root", "root");
// Add a user
db.admin().addUser("root", "root", function(err, result) {
test.equal(null, err);

db.admin().authenticate("root", "root", function(err, result) {
test.equal(null, err);
test.ok(result);

RS.killPrimary(9, function(err, result) {
db.collection('replicaset_test_auth').insert({a:1}, {safe:true}, function(err, result) {
test.equal(null, err);

db.close();
test.done();
});
});
});
});
});
}
Loading

0 comments on commit 0fb3a45

Please sign in to comment.