Skip to content

Commit

Permalink
Ensure open/fullsetup command executes correctly returning the righ d…
Browse files Browse the repository at this point in the history
…b for server/replicaset and mongos #1044
  • Loading branch information
christkv committed Jul 30, 2013
1 parent 7c5c55b commit 6012ec5
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 42 deletions.
8 changes: 2 additions & 6 deletions lib/mongodb/connection/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ DbStore.prototype.emit = function(event, message, object, reset, filterDb, rethr
if(db.listeners(event).length > 0) {
if(filterDb == null || filterDb.databaseName !== db.databaseName
|| filterDb.tag !== db.tag) {
db.emit(event, message, object);
db.emit(event, message, object == null ? db : object);
emitted = true;
}
}
Expand All @@ -212,7 +212,7 @@ DbStore.prototype.emit = function(event, message, object, reset, filterDb, rethr
if(this._dbs[i].listeners(event).length > 0) {
if(filterDb == null || filterDb.databaseName !== this._dbs[i].databaseName
|| filterDb.tag !== this._dbs[i].tag) {
this._dbs[i].emit(event, message, object);
this._dbs[i].emit(event, message, object == null ? this._dbs[i] : object);
emitted = true;
}
}
Expand Down Expand Up @@ -369,12 +369,8 @@ Base.prototype._callHandler = function(id, document, err) {
try {
callback(err, document, info.connection);
} catch(err) {
// self.emit("error", err, null, info.connection)
self._emitAcrossAllDbInstances(self, null, "error", err, self, true, true);
}

// // Emit to the callback of the object
// this._callBackStore.emit(id, err, document, info.connection);
}
}

Expand Down
16 changes: 13 additions & 3 deletions lib/mongodb/connection/mongos.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ var ReadPreference = require('./read_preference').ReadPreference
, Base = require('./base').Base
, Server = require('./server').Server
, format = require('util').format
, timers = require('timers')
, inherits = require('util').inherits;

// Set processor, setImmediate if 0.10 otherwise nextTick
var processor = timers.setImmediate ? timers.setImmediate : process.nextTick;

/**
* Mongos constructor provides a connection to a mongos proxy including failover to additional servers
*
Expand Down Expand Up @@ -124,8 +128,9 @@ Mongos.prototype.connect = function(db, options, callback) {

// Emit the open event
if(self.emitOpen)
self.db.emit("open", null, self.db);

self._emitAcrossAllDbInstances(self, null, "open", null, null, null);

self._emitAcrossAllDbInstances(self, null, "fullsetup", null, null, null);
// Callback
callback(null, self.db);
}
Expand All @@ -149,7 +154,7 @@ Mongos.prototype.connect = function(db, options, callback) {

// Emit close across all the attached db instances
if(Object.keys(self.upServers).length == 0) {
self._dbStore.emit("close", new Error("mongos disconnected, no valid proxies contactable over tcp"), null, true);
self._emitAcrossAllDbInstances(self, null, "close", new Error("mongos disconnected, no valid proxies contactable over tcp"), null, null);
}
}
}
Expand Down Expand Up @@ -490,6 +495,11 @@ Mongos.prototype.close = function(callback) {
if(self._replicasetTimeoutId != null) clearInterval(self._replicasetTimeoutId);
self._replicasetTimeoutId = null;

// Emit close event
processor(function() {
self._emitAcrossAllDbInstances(self, null, "close", null, null, true)
});

// Close all the up servers
for(var name in this.upServers) {
this.upServers[name].close(function(err, result) {
Expand Down
6 changes: 3 additions & 3 deletions lib/mongodb/connection/repl_set/repl_set.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,9 @@ ReplSet.prototype.connect = function(parent, options, callback) {
// Emit fullsetup
processor(function() {
if(self.emitOpen)
parent.emit("open", null, self.options.db, self);
parent.emit("fullsetup", null, self.options.db, self);
self._emitAcrossAllDbInstances(self, null, "open", null, null, null);

self._emitAcrossAllDbInstances(self, null, "fullsetup", null, null, null);
});

// If we have a strategy defined start it
Expand Down
2 changes: 1 addition & 1 deletion lib/mongodb/connection/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ Server.prototype.connect = function(dbInstance, options, callback) {
_server.isMasterDoc = reply.documents[0];

if(self.emitOpen)
_server._emitAcrossAllDbInstances(_server, eventReceiver, "open", null, returnIsMasterResults ? reply : dbInstance, null);
_server._emitAcrossAllDbInstances(_server, eventReceiver, "open", null, returnIsMasterResults ? reply : null, null);

// If we have it set to returnIsMasterResults
if(returnIsMasterResults) {
Expand Down
33 changes: 33 additions & 0 deletions test/tests/functional/connection_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,37 @@ exports.shouldCorrectlyDoSimpleCountExamplesWithUrl = function(configuration, te
test.done();
});
// DOC_END
}

/**
* @ignore
*/
exports.shouldCorrectlyReturnTheRightDbObjectOnOpenEmit = function(configuration, test) {
var db_conn = configuration.newDbInstance({w:1}, {poolSize:1, auto_reconnect:false});
var db2 = db_conn.db("test2");

db2.on('open', function (err, db) {
test.equal(db2.databaseName, db.databaseName);
});

db_conn.on('open', function (err, db) {
test.equal(db_conn.databaseName, db.databaseName);
});

db_conn.open(function (err) {
if(err) throw err;
var col1 = db_conn.collection('test');
var col2 = db2.collection('test');

var testData = { value : "something" };
col1.insert(testData, function (err) {
if (err) throw err;
col2.insert(testData, function (err) {
if (err) throw err;
db2.close();
console.log("done");
test.done();
});
});
});
}
55 changes: 26 additions & 29 deletions test/tests/repl_set/connecting_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,16 +180,11 @@ exports['Should emit close no callback'] = function(configuration, test) {
);

new Db('integration_test_', replSet, {w:0}).open(function(err, db) {
// console.log("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 1")
// console.log("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 1")
// console.log("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 1")
// console.dir(err)

test.equal(null, err);
var dbCloseCount = 0, serverCloseCount = 0;
db.on('close', function() { ++dbCloseCount; });

// Force a close on a socket
// db.serverConfig._state.addresses[replicasetManager.host + ":" + replicasetManager.ports[0]].connectionPool.openConnections[0].connection.destroy();
db.close();

setTimeout(function() {
Expand Down Expand Up @@ -317,13 +312,6 @@ exports['Should connect with primary stepped down'] = function(configuration, te

new Db('integration_test_', replSet, {w:0}).open(function(err, p_db) {
test.ok(err == null);
// console.log("====================================================")
// console.dir(Object.keys(p_db.serverConfig._state.addresses))
// console.dir(Object.keys(p_db.serverConfig._state.secondaries))
// console.dir(p_db.serverConfig._state.master != null)
// if(p_db.serverConfig._state.master)
// console.dir(p_db.serverConfig._state.master.isConnected())

test.equal(true, p_db.serverConfig.isConnected());

p_db.close();
Expand Down Expand Up @@ -394,10 +382,6 @@ exports['Should connect with secondary node killed'] = function(configuration, t

var db = new Db('integration_test_', replSet, {w:0});
db.open(function(err, p_db) {
// console.log("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 0")
// console.log("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 0")
// console.log("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 0")
// console.dir(err)
test.ok(err == null);
test.equal(true, p_db.serverConfig.isConnected());

Expand Down Expand Up @@ -550,34 +534,42 @@ exports['Should correctly emit all signals even if not yet connected'] = functio
);

var db_conn = new Db('integration_test_', replSet, {w:1});
var db2 = db_conn.db(db_conn.databaseName);
var db2 = db_conn.db('integration_test_2');
var close_count = 0;
var open_count = 0;
var fullsetup_count = 0;

db2.on('close', function () {
db2.on('close', function() {
close_count = close_count + 1;
});

db_conn.on('close', function () {
db_conn.on('close', function() {
close_count = close_count + 1;
});

db2.on('open', function () {
db2.on('open', function(err, db) {
// console.log("============================================= open 1 :: " + db.databaseName)
test.equal('integration_test_2', db.databaseName);
open_count = open_count + 1;
});
});

db_conn.on('open', function () {
db_conn.on('open', function(err, db) {
// console.log("============================================= open 2 :: " + db.databaseName)
test.equal('integration_test_', db.databaseName);
open_count = open_count + 1;
});
});

db2.on('fullsetup', function () {
db2.on('fullsetup', function(err, db) {
// console.log("============================================= fullsetup 1 :: " + db.databaseName)
test.equal('integration_test_2', db.databaseName);
fullsetup_count = fullsetup_count + 1;
});
});

db_conn.on('fullsetup', function () {
db_conn.on('fullsetup', function(err, db) {
// console.log("============================================= fullsetup 2 :: " + db.databaseName)
test.equal('integration_test_', db.databaseName);
fullsetup_count = fullsetup_count + 1;
});
});

db_conn.open(function (err) {
if (err) throw err;
Expand All @@ -594,11 +586,16 @@ exports['Should correctly emit all signals even if not yet connected'] = functio
if (err) throw err;
db2.close(function() {
setTimeout(function() {
// console.log("========================================= results")
// console.dir("close_count :: " + close_count)
// console.dir("open_count :: " + open_count)
// console.dir("fullsetup_count :: " + fullsetup_count)

test.equal(2, close_count);
test.equal(2, open_count);
test.equal(2, fullsetup_count);
test.done();
}, 100);
}, 1000);
});
});
});
Expand Down
87 changes: 87 additions & 0 deletions test/tests/sharded/mongoclient_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,91 @@ exports['Should correctly connect with a missing mongos'] = function(configurati
test.done();
}, 2000)
});
}

/**
* @ignore
*/
exports['Should correctly emit open and fullsetup to all db instances'] = function(configuration, test) {
var Mongos = configuration.getMongoPackage().Mongos
, Server = configuration.getMongoPackage().Server
, Db = configuration.getMongoPackage().Db;

var db_conn = new Db('integration_test_', new Mongos([new Server("localhost", 50000), new Server("localhost", 50001)]), {w:1});
var db2 = db_conn.db('integration_test_2');

var close_count = 0;
var open_count = 0;
var fullsetup_count = 0;

db2.on('close', function() {
close_count = close_count + 1;
});

db_conn.on('close', function() {
close_count = close_count + 1;
});

db2.on('open', function(err, db) {
// console.log("============================================= open 1 :: " + db.databaseName)
test.equal('integration_test_2', db.databaseName);
open_count = open_count + 1;
});

db_conn.on('open', function(err, db) {
// console.log("============================================= open 2 :: " + db.databaseName)
test.equal('integration_test_', db.databaseName);
open_count = open_count + 1;
});

db2.on('fullsetup', function(err, db) {
// console.log("============================================= fullsetup 1 :: " + db.databaseName)
test.equal('integration_test_2', db.databaseName);
fullsetup_count = fullsetup_count + 1;
});

db_conn.on('fullsetup', function(err, db) {
// console.log("============================================= fullsetup 2 :: " + db.databaseName)
test.equal('integration_test_', db.databaseName);
fullsetup_count = fullsetup_count + 1;
});

db_conn.open(function (err) {
if (err) throw err;

var col1 = db_conn.collection('test');
var col2 = db2.collection('test');

var testData = { value : "something" };
col1.insert(testData, function (err) {
if (err) throw err;

var testData = { value : "something" };
col2.insert(testData, function (err) {
if (err) throw err;
db2.close(function() {
setTimeout(function() {
// console.log("========================================= results")
// console.dir("close_count :: " + close_count)
// console.dir("open_count :: " + open_count)
// console.dir("fullsetup_count :: " + fullsetup_count)

test.equal(2, close_count);
test.equal(2, open_count);
test.equal(2, fullsetup_count);
test.done();
}, 1000);
});
});
});
});

// MongoClient.connect('mongodb://localhost:50002,localhost:50000,localhost:50001/test', {}, function(err, db) {
// setTimeout(function() {
// test.equal(null, err);
// test.ok(db != null);
// db.close();
// test.done();
// }, 2000)
// });
}

0 comments on commit 6012ec5

Please sign in to comment.