Skip to content

Commit

Permalink
Working on partial shard results
Browse files Browse the repository at this point in the history
  • Loading branch information
christkv committed Sep 23, 2012
1 parent 4ba1ddb commit 90b8311
Show file tree
Hide file tree
Showing 11 changed files with 219 additions and 5 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
.tm_properties
data
node_modules/
lib-cov/
output
build
upload.py
5 changes: 5 additions & 0 deletions HISTORY
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
1.1.8
----------------
- Fixed db.eval to correctly handle system.js global javascript functions (Issue #709)
- Cleanup of non-closing connections (Issue #706)

1.1.7 2012-09-10
----------------
- Protect against starting PingStrategy being called more than once (Issue #694, https://github.com/aheckmann)
Expand Down
1 change: 1 addition & 0 deletions dev/tools/gleak.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ gleak.ignore('testFullSpec_param_found');
gleak.ignore('events');
gleak.ignore('TAP_Global_Harness');
gleak.ignore('Uint8ClampedArray');
gleak.ignore('_$jscoverage');

module.exports = gleak;
2 changes: 2 additions & 0 deletions lib/mongodb/collection.js
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,7 @@ var testForFields = {'limit' : 1, 'sort' : 1, 'fields' : 1, 'skip' : 1, 'hint' :
* - **raw** {Boolean, default:false}, Return all BSON documents as Raw Buffer documents.
* - **readPreference** {String}, the preferred read preference ((Server.PRIMARY, Server.PRIMARY_PREFERRED, Server.SECONDARY, Server.SECONDARY_PREFERRED, Server.NEAREST).
* - **numberOfRetries** {Number, default:5}, if using awaidata specifies the number of times to retry on timeout.
* - **partial** {Boolean, default:false}, specify if the cursor should return partial results when querying against a sharded system
*
* @param {Object} query query object to locate the object to modify
* @param {Object} [options] additional options during update.
Expand Down Expand Up @@ -936,6 +937,7 @@ var normalizeHintField = function normalizeHintField(hint) {
* - **comment** {String}, You can put a $comment field on a query to make looking in the profiler logs simpler.
* - **raw** {Boolean, default:false}, Return all BSON documents as Raw Buffer documents.
* - **readPreference** {String}, the preferred read preference (Server.PRIMARY, Server.PRIMARY_PREFERRED, Server.SECONDARY, Server.SECONDARY_PREFERRED, Server.NEAREST).
* - **partial** {Boolean, default:false}, specify if the cursor should return partial results when querying against a sharded system
*
* @param {Object} query query object to locate the object to modify
* @param {Object} [options] additional options during update.
Expand Down
3 changes: 2 additions & 1 deletion lib/mongodb/commands/query_command.js
Original file line number Diff line number Diff line change
Expand Up @@ -257,4 +257,5 @@ QueryCommand.OPTS_SLAVE = 4;
QueryCommand.OPTS_OPLOG_REPLY = 8;
QueryCommand.OPTS_NO_CURSOR_TIMEOUT = 16;
QueryCommand.OPTS_AWAIT_DATA = 32;
QueryCommand.OPTS_EXHAUST = 64;
QueryCommand.OPTS_EXHAUST = 64;
QueryCommand.OPTS_PARTIAL = 128;
10 changes: 8 additions & 2 deletions lib/mongodb/cursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ var QueryCommand = require('./commands/query_command').QueryCommand,
* @param {Number} numberOfRetries if using awaidata specifies the number of times to retry on timeout.
* @param {String} dbName override the default dbName.
* @param {Number} tailableRetryInterval specify the miliseconds between getMores on tailable cursor.
* @param {Boolean} have the server send all the documents at once as getMore packets.
* @param {Boolean} exhaust have the server send all the documents at once as getMore packets.
* @param {Boolean} partial have the sharded system return a partial result from mongos.
*/
function Cursor(db, collection, selector, fields, skip, limit
, sort, hint, explain, snapshot, timeout, tailable, batchSize, slaveOk, raw, read
, returnKey, maxScan, min, max, showDiskLoc, comment, awaitdata, numberOfRetries, dbName, tailableRetryInterval, exhaust) {
, returnKey, maxScan, min, max, showDiskLoc, comment, awaitdata, numberOfRetries, dbName, tailableRetryInterval, exhaust, partial) {
this.db = db;
this.collection = collection;
this.selector = selector;
Expand Down Expand Up @@ -69,6 +70,7 @@ function Cursor(db, collection, selector, fields, skip, limit
this.comment = comment;
this.tailableRetryInterval = tailableRetryInterval || 100;
this.exhaust = exhaust || false;
this.partial = partial || false;

this.totalNumberOfRecords = 0;
this.items = [];
Expand Down Expand Up @@ -417,6 +419,10 @@ var generateQueryCommand = function(self) {
queryOptions |= QueryCommand.OPTS_SLAVE;
}

if(self.partial) {
queryOptions |= QueryCommand.OPTS_PARTIAL;
}

// limitValue of -1 is a special case used by Db#eval
var numberToReturn = self.limitValue == -1 ? -1 : limitRequest(self);

Expand Down
2 changes: 1 addition & 1 deletion lib/mongodb/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,7 @@ Db.prototype.command = function(selector, options, callback) {
if(readPreference != false) {
if(selector['group'] || selector['aggregate'] || selector['collStats'] || selector['dbStats']
|| selector['count'] || selector['distinct'] || selector['geoNear'] || selector['geoSearch'] || selector['geoWalk']
|| (selector['mapreduce'] && selector.out = 'inline')) {
|| (selector['mapreduce'] && selector.out == 'inline')) {
// Set the read preference
cursor.setReadPreference(readPreference);
} else {
Expand Down
File renamed without changes.
142 changes: 142 additions & 0 deletions test/sharded/partial_read_test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
var mongodb = process.env['TEST_NATIVE'] != null ? require('../../lib/mongodb').native() : require('../../lib/mongodb').pure();
var noReplicasetStart = process.env['NO_REPLICASET_START'] != null ? true : false;

var testCase = require('nodeunit').testCase,
debug = require('util').debug,
inspect = require('util').inspect,
gleak = require('../../dev/tools/gleak'),
ShardedManager = require('../tools/sharded_manager').ShardedManager,
Db = mongodb.Db,
Mongos = mongodb.Mongos,
ReadPreference = mongodb.ReadPreference,
Server = mongodb.Server;

// Keep instance of ReplicaSetManager
var serversUp = false;
var retries = 120;
var Shard = Shard == null ? null : Shard;

/**
* Retrieve the server information for the current
* instance of the db client
*
* @ignore
*/
exports.setUp = function(callback) {
Shard = new ShardedManager({
// A single replicaset in our sharded system
numberOfReplicaSets:2,
replPortRangeSet:30000,
// A single configuration server
numberOfConfigServers:1,
configPortRangeSet:40000,
// Two mongos proxies to ensure correct failover
numberOfMongosServers:2,
mongosRangeSet:50000,
// Collection and shard key setup
db:"sharded_test_db",
collection:"sharded_test_db_collection",
shardKey: "_id"
})

// Start the shard
Shard.start(function(err, result) {
Shard.shardDb("integration_test_", function(err, result) {
console.log("================================= sharded db")
console.dir(err)
console.dir(result)

Shard.shardCollection("integration_test_.shard_all_operations_test", {_id:1}, function(err, result) {
console.log("================================= sharded collection")
console.dir(err)
console.dir(result)

callback();
});
});
});
}

/**
* Retrieve the server information for the current
* instance of the db client
*
* @ignore
*/
exports.tearDown = function(callback) {
// Shard.killAll(function() {
callback();
// });
}

/**
* @ignore
*/
exports.shouldCorrectlyPerformAllOperationsAgainstShardedSystem = function(test) {
console.log("____________________________________________________________________")
// Set up mongos connection
var mongos = new Mongos([
new Server("localhost", 50000, { auto_reconnect: true })
])

// Set up a bunch of documents
var docs = [];
for(var i = 0; i < 1000; i++) {
docs.push({a:i, data:new Buffer(1024)});
}

// Connect using the mongos connections
var db = new Db('integration_test_', mongos);
db.open(function(err, db) {
test.equal(null, err);
test.ok(db != null);

var collection = db.collection("shard_all_operations_test");
collection.insert(docs, {safe:{w:1, wtimeout:1000}}, function(err, result) {
test.equal(null, err);


Shard.killShard(function() {

collection.find({}, {partial:true}).toArray(function(err, items) {
// test.equal(null, err);
// test.ok(items.length > 0)
console.log("-------------------------------------------------------------")
console.dir(err)
console.dir(items)

db.close();
test.done();
});
});

// // Perform a find and each
// collection.find().each(function(err, item) {
// if(err) console.dir(err)

// if(item == null) {
// test.equal(1000, numberOfRecords);

// // Perform a find and each
// collection.find().toArray(function(err, items) {
// if(err) console.dir(err)
// test.equal(1000, items.length);

// db.close();
// test.done();
// })
// } else {
// numberOfRecords = numberOfRecords + 1;
// }
// });
});
});
}

/**
* Retrieve the server information for the current
* instance of the db client
*
* @ignore
*/
var numberOfTestsRun = Object.keys(this).length - 2;
24 changes: 23 additions & 1 deletion test/tools/replica_set_manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,28 @@ ReplicaSetManager.prototype.kill = function(node, signal, options, callback) {
});
}

ReplicaSetManager.prototype.killSetServers = function(callback) {
var keys = Object.keys(this.mongods);
var totalKeys = keys.length;
var self = this;

var killCallback = function(_nodeKey) {
return function(err, result) {
console.log("====================================================== KILL")
console.dir(_nodeKey)

self.kill(_nodeKey, 9, function() {
totalKeys = totalKeys - 1;
if(totalKeys == 0) return callback(null, null);
})
}
}

for(var i = 0; i < keys.length; i++) {
killCallback(keys[i])();
}
}

ReplicaSetManager.prototype.killPrimary = function(signal, options, callback) {
var self = this;
// Unpack callback and variables
Expand Down Expand Up @@ -562,7 +584,7 @@ ReplicaSetManager.prototype.restart = start;

ReplicaSetManager.prototype.startCmd = function(n) {
// Create boot command
this.mongods[n]["start"] = "mongod --rest --noprealloc --smallfiles --replSet " + this.name + " --logpath '" + this.mongods[n]['log_path'] + "' " +
this.mongods[n]["start"] = "mongod --nojournal --oplogSize 1 --rest --noprealloc --smallfiles --replSet " + this.name + " --logpath '" + this.mongods[n]['log_path'] + "' " +
" --dbpath " + this.mongods[n]['db_path'] + " --port " + this.mongods[n]['port'] + " --fork";
this.mongods[n]["start"] = this.durable ? this.mongods[n]["start"] + " --dur" : this.mongods[n]["start"];

Expand Down
34 changes: 34 additions & 0 deletions test/tools/sharded_manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ ShardedManager.prototype.killAll = function(callback) {
});
}

// Kill a random shard
ShardedManager.prototype.killShard = function(callback) {
var replicasetServer = this.replicasetManagers.pop();
replicasetServer.killSetServers(callback);
}

// Kills the first server
ShardedManager.prototype.killMongoS = function(port, callback) {
// Locate the server instance and kill it
Expand All @@ -150,6 +156,34 @@ ShardedManager.prototype.restartMongoS = function(port, callback) {
}
}

// Shard a db
ShardedManager.prototype.shardDb = function(dbname, callback) {
if(this.mongosProxies.length == 0) throw new Error("need at least one mongos server");
// Set up the db connection
var db = new Db("admin", new Server("localhost", this.mongosRangeSet, {auto_reconnect: true, poolSize: 4}), {});
db.open(function(err, db) {
// Run the add shard commands
db.command({enablesharding:dbname}, function(err, result) {
db.close();
callback(err, result);
});
});
}

// Shard a db
ShardedManager.prototype.shardCollection = function(collectionName, key, callback) {
if(this.mongosProxies.length == 0) throw new Error("need at least one mongos server");
// Set up the db connection
var db = new Db("admin", new Server("localhost", this.mongosRangeSet, {auto_reconnect: true, poolSize: 4}), {});
db.open(function(err, db) {
// Run the add shard commands
db.command({shardcollection:collectionName, key:key}, function(err, result) {
db.close();
callback(err, result);
});
});
}

var setupShards = function(self, callback) {
if(self.mongosProxies.length == 0) throw new Error("need at least one mongos server");
// Set up the db connection
Expand Down

0 comments on commit 90b8311

Please sign in to comment.