Skip to content

Commit

Permalink
Started refactoring code to allow for seperate reader and writer conn…
Browse files Browse the repository at this point in the history
…ections
  • Loading branch information
christkv committed May 17, 2011
1 parent 9677c19 commit c4195d7
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 30 deletions.
69 changes: 66 additions & 3 deletions integration/replicaset/query_secondaries.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,79 @@ module.exports = testCase({
// Insert some data
var db = new Db('integration_test_', replSet);
db.open(function(err, p_db) {
debug("======================================================= open")
debug("err :: " + inspect(err))

// Drop collection on replicaset
p_db.dropCollection('testsets', function(err, r) {
test.equal(false, p_db.serverConfig.isReadPrimary());
test.equal(false, p_db.serverConfig.isPrimary());
test.done();
});
})
},

shouldCorrectlyTestConnection : function(test) {
// Replica configuration
var replSet = new ReplSetServers( [
new Server( RS.host, RS.ports[0], { auto_reconnect: true } ),
],
{rs_name:RS.name, read_secondary:true}
);

// Insert some data
var db = new Db('integration_test_', replSet);
db.open(function(err, p_db) {
// Drop collection on replicaset
p_db.dropCollection('testsets', function(err, r) {
test.ok(p_db.serverConfig.primary != null);
test.ok(p_db.serverConfig.read != null);
test.ok(p_db.serverConfig.primary.port != p_db.serverConfig.read.port);
test.done();
});
})
},

shouldCorrectlyQuerySecondaries : function(test) {
// Replica configuration
var replSet = new ReplSetServers( [
new Server( RS.host, RS.ports[0], { auto_reconnect: true } ),
],
{rs_name:RS.name, read_secondary:true}
);

// Insert some data
var db = new Db('integration_test_', replSet);
db.open(function(err, p_db) {
p_db.collection("testsets", {safe:{w:3, wtimeout:10000}}, function(err, collection) {
Step(
function inserts() {
var group = this.group();
collection.save({a:20}, group());
collection.save({a:30}, group());
collection.save({a:40}, group());
},

function done(err, values) {
var results = [];

collection.find().each(function(err, item) {
if(item == null) {
// Check all the values
var r = [20, 30, 40];
for(var i = 0; i < r.length; i++) {
test.equal(1, results.filter(function(element) {
return element.a == r[i];
}).length);
}

// p_db.close();
test.done();
} else {
results.push(item);
}
});
}
);
});
})
}
})

Expand Down
2 changes: 1 addition & 1 deletion integration/tools/replica_set_manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ ReplicaSetManager.prototype.startSet = function(callback) {
ReplicaSetManager.prototype.initiate = function(callback) {
var self = this;
// Get master connection
self.getConnection(function(err, connection) {
self.getConnection(function(err, connection) {
if(err != null) return callback(err, null);
// Set replica configuration
connection.admin().command({replSetInitiate:self.config}, function(err, result) {
Expand Down
33 changes: 21 additions & 12 deletions lib/mongodb/collection.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,13 @@ var toString = Object.prototype.toString;
* @param {Function} pkFactory
*/

function Collection (db, collectionName, pkFactory) {
function Collection (db, collectionName, pkFactory, options) {
this.checkCollectionName(collectionName);

this.db = db;
this.collectionName = collectionName;
this.internalHint;
this.opts = options != null && ('object' === typeof options) ? options : {};

this.pkFactory = pkFactory == null
? db.bson_serializer.ObjectID
Expand Down Expand Up @@ -163,8 +164,10 @@ Collection.prototype.remove = function remove (selector, options, callback) {
return callback(result);
}

if (options.safe || this.db.strict) {
var errorOptions = options.safe != null ? options.safe : null;
if (options.safe || this.opts.safe != null || this.db.strict) {
var errorOptions = options.safe != null ? options.safe : null;
errorOptions = errorOptions == null && this.opts.safe != null ? this.opts.safe : errorOptions;

this.db.error(errorOptions, function (err, error) {
if (error[0].err) {
callback(new Error(error[0].err));
Expand Down Expand Up @@ -244,6 +247,7 @@ Collection.prototype.insertAll = function insertAll (docs, options, callback) {
// If safe is defined check for error message
if (options.safe || this.db.strict) {
var errorOptions = options.safe != null ? options.safe : null;
errorOptions = errorOptions == null && this.opts.safe != null ? this.opts.safe : errorOptions;

this.db.error(errorOptions, function (err, error) {
if (error[0].err) {
Expand All @@ -266,15 +270,19 @@ Collection.prototype.insertAll = function insertAll (docs, options, callback) {
*/

Collection.prototype.save = function save (doc, options, callback) {
if ('function' === typeof options) callback = options, options = null;
var args = Array.prototype.slice.call(arguments, 1);
callback = args.pop();
options = args.length ? args.shift() : {};

var safe = options && options.safe;
var errorOptions = options.safe != null ? options.safe : false;
errorOptions = errorOptions == null && this.opts.safe != null ? this.opts.safe : errorOptions;

var id = doc['_id'];

if (id) {
this.update({ _id: id }, doc, { upsert: true, safe: safe }, callback);
this.update({ _id: id }, doc, { upsert: true, safe: errorOptions }, callback);
} else {
this.insert(doc, { safe: safe }, callback && function (err, docs) {
this.insert(doc, { safe: errorOptions }, callback && function (err, docs) {
if (err) return callback(err, null);

if (Array.isArray(docs)) {
Expand Down Expand Up @@ -323,6 +331,7 @@ Collection.prototype.update = function update (selector, document, options, call
// If safe, we need to check for successful execution
if (options && options.safe || this.db.strict) {
var errorOptions = options.safe != null ? options.safe : null;
errorOptions = errorOptions == null && this.opts.safe != null ? this.opts.safe : errorOptions;
this.db.error(errorOptions, function (err, error) {
// FIXME: handle err
if (error[0].err) {
Expand Down Expand Up @@ -354,7 +363,7 @@ Collection.prototype.distinct = function distinct (key, query, callback) {

var cmd = DbCommand.createDbCommand(this.db, mapCommandHash);

this.db.executeCommand(cmd, function (err, result) {
this.db.executeCommand(cmd, true, function (err, result) {
if (err) {
return callback(err);
}
Expand Down Expand Up @@ -393,7 +402,7 @@ Collection.prototype.count = function count (query, callback) {
, null
);

this.db.executeCommand(queryCommand, function (err, result) {
this.db.executeCommand(queryCommand, true, function (err, result) {
if (err) {
callback(err);
} else if (result.documents[0].ok != 1) {
Expand Down Expand Up @@ -643,7 +652,7 @@ Collection.prototype.findOne = function findOne (queryObject, options, callback)
, query
, fields);

this.db.executeCommand(queryCommand, function (err, result) {
this.db.executeCommand(queryCommand, true, function (err, result) {
if (!err && result.documents[0] && result.documents[0]['$err']) {
return callback(result.documents[0]['$err']);
}
Expand Down Expand Up @@ -763,7 +772,7 @@ Collection.prototype.mapReduce = function mapReduce (map, reduce, options, callb
var self = this;
var cmd = DbCommand.createDbCommand(this.db, mapCommandHash);

this.db.executeCommand(cmd, function (err, result) {
this.db.executeCommand(cmd, true, function (err, result) {
if (err) {
return callback(err);
}
Expand Down Expand Up @@ -880,7 +889,7 @@ Collection.prototype.group = function group (keys, condition, initial, reduce, c

var cmd = DbCommand.createDbCommand(this.db, selector);

this.db.executeCommand(cmd, function (err, result) {
this.db.executeCommand(cmd, true, function (err, result) {
if (err) return callback(err);

var document = result.documents[0];
Expand Down
15 changes: 15 additions & 0 deletions lib/mongodb/connections/repl_set_servers.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ var ReplSetServers = exports.ReplSetServers = function(servers, options) {
}
});

Object.defineProperty(this, "read", {
enumerable: true
, get: function () {
return this.secondaries.length > 0 ? this.secondaries[0] : null;
}
});

// Master connection property
Object.defineProperty(this, "primary", {
enumerable: true
Expand Down Expand Up @@ -317,6 +324,14 @@ ReplSetServers.prototype.connect = function(parent, callback) {
serverConnections.forEach(initServer);
}

ReplSetServers.prototype.checkoutWriter = function() {
return this.primary.connection;
}

ReplSetServers.prototype.checkoutReader = function() {
return this.primary.connection;
}

ReplSetServers.prototype.close = function() {
this.servers.forEach(function(server) {
server.close();
Expand Down
8 changes: 8 additions & 0 deletions lib/mongodb/connections/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ Server.prototype.connect = function(parent, callback) {
server.connection.open();
}

Server.prototype.checkoutWriter = function() {
return this.connection;
}

Server.prototype.checkoutReader = function() {
return this.connection;
}

Server.prototype.close = function() {
this.connection.close();
}
10 changes: 5 additions & 5 deletions lib/mongodb/cursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ Cursor.prototype.nextObject = function(callback) {
result = null;
};

self.db.executeCommand(self.generateQueryCommand(), commandHandler);
self.db.executeCommand(self.generateQueryCommand(), true, commandHandler);
commandHandler = null;
} catch(err) {
callback(new Error(err.toString()), null);
Expand Down Expand Up @@ -507,7 +507,7 @@ Cursor.prototype.getMore = function(callback) {
try {
var getMoreCommand = new GetMoreCommand(self.db, self.collectionName, self.limitRequest(), self.cursorId);
// Execute the command
self.db.executeCommand(getMoreCommand, function(err, result) {
self.db.executeCommand(getMoreCommand, true, function(err, result) {
self.cursorId = result.cursorId;
self.totalNumberOfRecords += result.numberReturned;
// Determine if there's more documents to fetch
Expand Down Expand Up @@ -582,7 +582,7 @@ Cursor.prototype.streamRecords = function(options) {
execute(queryCommand);

function execute(command) {
self.db.executeCommand(command, function(err,result) {
self.db.executeCommand(command, true, function(err,result) {
if (!self.queryRun && result) {
self.queryRun = true;
self.cursorId = result.cursorId;
Expand Down Expand Up @@ -632,12 +632,12 @@ Cursor.prototype.close = function(callback) {
if(this.cursorId instanceof self.db.bson_serializer.Long && this.cursorId.greaterThan(self.db.bson_serializer.Long.fromInt(0))) {
try {
var command = new KillCursorCommand(this.db, [this.cursorId]);
this.db.executeCommand(command, null);
this.db.executeCommand(command, true, null);
} catch(err) {}
}

this.cursorId = self.db.bson_serializer.Long.fromInt(0);
this.state = Cursor.CLOSED;
this.state = Cursor.CLOSED;

// callback for backward compatibility
if (callback) {
Expand Down
47 changes: 38 additions & 9 deletions lib/mongodb/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ var QueryCommand = require('./commands/query_command').QueryCommand,
MD5 = require('./crypto/md5').MD5,
EventEmitter = require('events').EventEmitter,
inherits = require('sys').inherits,
sys = require('sys');
sys = require('sys'),
debug = require('util').debug,
inspect = require('util').inspect;

var Db = exports.Db = function(databaseName, serverConfig, options) {
EventEmitter.call(this);
Expand Down Expand Up @@ -111,20 +113,21 @@ Db.prototype.collectionNames = function(collection_name, callback) {
/**
Fetch a specific collection (containing the actual collection information)
**/
Db.prototype.collection = function(collectionName, callback) {
Db.prototype.collection = function(collectionName, options, callback) {
var self = this;
if(typeof options === "function") { callback = options; options = {}; }

try {
if(self.strict) {
self.collectionNames(collectionName, function(err, collections) {
if(collections.length == 0) {
return callback(new Error("Collection " + collectionName + " does not exist. Currently in strict mode."), null);
} else {
return callback(null, new Collection(self, collectionName, self.pkFactory));
return callback(null, new Collection(self, collectionName, self.pkFactory, options));
}
});
} else {
return callback(null, new Collection(self, collectionName, self.pkFactory));
return callback(null, new Collection(self, collectionName, self.pkFactory, options));
}
} catch(err) {
return callback(err, null);
Expand Down Expand Up @@ -449,8 +452,12 @@ Db.prototype.dropDatabase = function(callback) {
/**
Execute db command
**/
Db.prototype.executeCommand = function(db_command, callback) {
Db.prototype.executeCommand = function(db_command, read, callback) {
var self = this;

var args = Array.prototype.slice.call(arguments, 1);
callback = args.pop();
read = args.length ? args.shift() : false;

if(callback instanceof Function) {
// Add the callback to the list of callbacks by the request id (mapping outgoing messages to correct callbacks)
Expand All @@ -466,11 +473,23 @@ Db.prototype.executeCommand = function(db_command, callback) {
try{
if ( dbinstance.backup.server ) { // use slave this ONE time
self.notReplied[db_command.getRequestId().toString()] = dbinstance.backup.server.connection;
dbinstance.backup.server.connection.send( db_command);

// debug("========================================================= 1");

var writer = read ? dbinstance.backup.server.checkoutReader() : dbinstance.backup.server.checkoutWriter();
writer.send(db_command)

// dbinstance.backup.server.connection.send(db_command);
dbinstance.backup.server = null;
} else {
self.notReplied[db_command.getRequestId().toString()] = dbinstance.serverConfig.primary;
dbinstance.serverConfig.primary.send(db_command);

// debug("========================================================= 2");

var writer = read ? dbinstance.serverConfig.checkoutReader() : dbinstance.serverConfig.checkoutWriter();
writer.send(db_command)

// dbinstance.serverConfig.primary.send(db_command);
}
} catch ( err ) {
// Clean up callback if it exists
Expand All @@ -489,7 +508,13 @@ Db.prototype.executeCommand = function(db_command, callback) {
};

try{
self.serverConfig.primary.send(db_command);
// debug("========================================================= 3");
var writer = read ? self.serverConfig.checkoutReader() : self.serverConfig.checkoutWriter();
// debug(inspect(writer))

writer.send(db_command)

// self.serverConfig.primary.send(db_command);
} catch(err){
if(callback instanceof Function) {
delete self.notReplied[db_command.getRequestId().toString()];
Expand Down Expand Up @@ -638,7 +663,11 @@ Db.prototype.checkMaster_ = function(dbcopy, returnback) {
server.connection.on("close", function() { dbcopy.emit("close", this); });
}

server.connection.send(db_cmnd);
// debug("========================================================= 4");

var writer = read ? server.checkoutReader() : server.checkoutWriter();
writer.send(db_cmnd);
// server.connection.send(db_cmnd);
}
} else {
server.master = false;
Expand Down

0 comments on commit c4195d7

Please sign in to comment.