Skip to content

Commit

Permalink
Only execute callback if provided in base.js
Browse files Browse the repository at this point in the history
  • Loading branch information
christkv committed Aug 6, 2013
1 parent 02a5723 commit 9b90933
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 92 deletions.
1 change: 1 addition & 0 deletions HISTORY
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
1.3.16 2013-08-02
-----------------
- Fixes connection issue where lots of connections would happen if a server is in recovery mode during connection (Issue #1050, NODE-50, NODE-51)
- Bug in unlink mulit filename (Issue #1054)

1.3.15 2013-08-01
-----------------
Expand Down
4 changes: 2 additions & 2 deletions lib/mongodb/connection/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,9 @@ Base.prototype._callHandler = function(id, document, err) {
// Force key deletion because it nulling it not deleting in 0.10.X
if(this._callBackStore._events && this._callBackStore._events[id] != null) delete this._callBackStore._events[id];

// Execute the callback
try {
callback(err, document, info.connection);
// Execute the callback if one was provided
if(typeof callback == 'function') callback(err, document, info.connection);
} catch(err) {
self._emitAcrossAllDbInstances(self, null, "error", err, self, true, true);
}
Expand Down
109 changes: 21 additions & 88 deletions lib/mongodb/responses/mongo_reply.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,99 +42,32 @@ MongoReply.prototype.parseHeader = function(binary_reply, bson) {

MongoReply.prototype.parseBody = function(binary_reply, bson, raw, callback) {
raw = raw == null ? false : raw;
// Just set a doc limit for deserializing
var docLimitSize = 1024*20;

// If our message length is very long, let's switch to process.nextTick for messages
if(this.messageLength > docLimitSize) {
var batchSize = this.numberReturned;
this.documents = new Array(this.numberReturned);

// Just walk down until we get a positive number >= 1
for(var i = 50; i > 0; i--) {
if((this.numberReturned/i) >= 1) {
batchSize = i;
break;
try {
// Let's unpack all the bson documents, deserialize them and store them
for(var object_index = 0; object_index < this.numberReturned; object_index++) {
var _options = {promoteLongs: bson.promoteLongs};

// Read the size of the bson object
var bsonObjectSize = binary_reply[this.index] | binary_reply[this.index + 1] << 8 | binary_reply[this.index + 2] << 16 | binary_reply[this.index + 3] << 24;

// If we are storing the raw responses to pipe straight through
if(raw) {
// Deserialize the object and add to the documents array
this.documents.push(binary_reply.slice(this.index, this.index + bsonObjectSize));
} else {
// Deserialize the object and add to the documents array
this.documents.push(bson.deserialize(binary_reply.slice(this.index, this.index + bsonObjectSize), _options));
}

// Adjust binary index to point to next block of binary bson data
this.index = this.index + bsonObjectSize;
}

// Actual main creator of the processFunction setting internal state to control the flow
var parseFunction = function(_self, _binary_reply, _batchSize, _numberReturned) {
var object_index = 0;
// Internal loop process that will use nextTick to ensure we yield some time
var processFunction = function() {
// Adjust batchSize if we have less results left than batchsize
if((_numberReturned - object_index) < _batchSize) {
_batchSize = _numberReturned - object_index;
}

// If raw just process the entries
if(raw) {
// Iterate over the batch
for(var i = 0; i < _batchSize; i++) {
// Are we done ?
if(object_index <= _numberReturned) {
// Read the size of the bson object
var bsonObjectSize = _binary_reply[_self.index] | _binary_reply[_self.index + 1] << 8 | _binary_reply[_self.index + 2] << 16 | _binary_reply[_self.index + 3] << 24;
// If we are storing the raw responses to pipe straight through
_self.documents[object_index] = binary_reply.slice(_self.index, _self.index + bsonObjectSize);
// Adjust binary index to point to next block of binary bson data
_self.index = _self.index + bsonObjectSize;
// Update number of docs parsed
object_index = object_index + 1;
}
}
} else {
try {
// Parse documents
_self.index = bson.deserializeStream(binary_reply
, _self.index
, _batchSize
, _self.documents
, object_index
, {promoteLongs: bson.promoteLongs});
// Adjust index
object_index = object_index + _batchSize;
} catch (err) {
return callback(err);
}
}

// If we have more documents process NextTick
if(object_index < _numberReturned) {
processor(processFunction);
} else {
callback(null);
}
}

// Return the process function
return processFunction;
}(this, binary_reply, batchSize, this.numberReturned)();
} else {
try {
// Let's unpack all the bson documents, deserialize them and store them
for(var object_index = 0; object_index < this.numberReturned; object_index++) {
var _options = {promoteLongs: bson.promoteLongs};
// Read the size of the bson object
var bsonObjectSize = binary_reply[this.index] | binary_reply[this.index + 1] << 8 | binary_reply[this.index + 2] << 16 | binary_reply[this.index + 3] << 24;
// If we are storing the raw responses to pipe straight through
if(raw) {
// Deserialize the object and add to the documents array
this.documents.push(binary_reply.slice(this.index, this.index + bsonObjectSize));
} else {
// Deserialize the object and add to the documents array
this.documents.push(bson.deserialize(binary_reply.slice(this.index, this.index + bsonObjectSize), _options));
}
// Adjust binary index to point to next block of binary bson data
this.index = this.index + bsonObjectSize;
}
} catch(err) {
return callback(err);
}


// No error return
callback(null);
} catch(err) {
return callback(err);
}
}

Expand Down
12 changes: 10 additions & 2 deletions test/tests/manual_tests/manual_ha_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ var replSet = new ReplSetServers([
}
);

// RS = new ReplicaSetManager({name:"testappset", retries:120, secondary_count:2, passive_count:1, arbiter_count:1});
// RS = new ReplicaSetManager({name:"testappset", retries:120, secondary_count:2, passive_count:0, arbiter_count:0, auth:true});
// RS.startSet(true, function(err, result) {
// process.exit(0)
// if(err != null) throw err;

// setInterval(function() {
Expand All @@ -42,7 +43,14 @@ var replSet = new ReplSetServers([
//opens the database
// var db = new Db('testapp', replSet);
// db.open(function(err) {
MongoClient.connect("mongodb://mallory:a@localhost:30000,localhost:30001,localhost:30002/foo?authSource=users&readPreference=primary", function(err, db) {
MongoClient.connect("mongodb://a:a@localhost:30000,localhost:30001,localhost:30002/foo?authSource=admin&readPreference=primary", {
replSet: {
socketOptions: {connectTimeoutMS: 1000, socketTimeoutMS: 1000}
},
server: {
socketOptions: {connectTimeoutMS: 1000, socketTimeoutMS: 1000}
}
}, function(err, db) {
if (err) return console.log('database open error %o', err);
console.log('database opened');

Expand Down

0 comments on commit 9b90933

Please sign in to comment.