Skip to content

Commit

Permalink
Changed server to use prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
christkv committed Nov 11, 2014
1 parent e388905 commit 4fc366c
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 134 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pids
*.seed
*.tmp
*.dat
*.png

# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov
Expand Down
1 change: 1 addition & 0 deletions .npmignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*.log
*.heapsnapshot
*.dat
*.png

HISTORY
Readme.md
Expand Down
295 changes: 161 additions & 134 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,191 +118,220 @@ var Server = function(host, port, options) {
// Server capabilities
var sCapabilities = null;

// Define the internal properties
this.s = {
// Create an instance of a server instance from mongodb-core
server: server
// Server capabilities
, sCapabilities: null
// Cloned options
, clonedOptions: clonedOptions
// Reconnect
, reconnect: reconnect
// Emit error
, emitError: emitError
// Pool size
, poolSize: poolSize
// Store Options
, storeOptions: storeOptions
// Store
, store: store
// Host
, host: host
// Port
, port: port
// Options
, options: options
}

// BSON property
Object.defineProperty(this, 'bson', {
enumerable: true, get: function() {
return server.bson;
return self.s.server.bson;
}
});

// Last ismaster
Object.defineProperty(this, 'isMasterDoc', {
enumerable:true, get: function() {
return server.lastIsMaster();
return self.s.server.lastIsMaster();
}
});

// Last ismaster
Object.defineProperty(this, 'poolSize', {
enumerable:true, get: function() { return server.connections().length; }
enumerable:true, get: function() { return self.s.server.connections().length; }
});

Object.defineProperty(this, 'autoReconnect', {
enumerable:true, get: function() { return reconnect; }
enumerable:true, get: function() { return self.s.reconnect; }
});

Object.defineProperty(this, 'host', {
enumerable:true, get: function() { return host; }
enumerable:true, get: function() { return self.s.host; }
});

Object.defineProperty(this, 'port', {
enumerable:true, get: function() { return port; }
enumerable:true, get: function() { return self.s.port; }
});
}

this.parserType = function() {
return server.parserType();
}

// Connect
this.connect = function(db, _options, callback) {
if('function' === typeof _options) callback = _options, _options = {};
if(_options == null) _options = {};
if(!('function' === typeof callback)) callback = null;
options = _options;

// Update bufferMaxEntries
storeOptions.bufferMaxEntries = db.bufferMaxEntries;

// Error handler
var connectErrorHandler = function(event) {
return function(err) {
// Remove all event handlers
var events = ['timeout', 'error', 'close'];
events.forEach(function(e) {
self.removeListener(e, connectErrorHandler);
});

server.removeListener('connect', connectErrorHandler);

// Try to callback
try {
callback(err);
} catch(err) {
process.nextTick(function() { throw err; })
}
}
}

// Actual handler
var errorHandler = function(event) {
return function(err) {
if(event != 'error') {
self.emit(event, err);
}
}
}
inherits(Server, EventEmitter);

// Error handler
var reconnectHandler = function(err) {
self.emit('reconnect', self);
store.execute();
}
Server.prototype.parserType = function() {
return this.s.server.parserType();
}

// Connect handler
var connectHandler = function() {
// Clear out all the current handlers left over
["timeout", "error", "close"].forEach(function(e) {
server.removeAllListeners(e);
// Connect
Server.prototype.connect = function(db, _options, callback) {
var self = this;
if('function' === typeof _options) callback = _options, _options = {};
if(_options == null) _options = {};
if(!('function' === typeof callback)) callback = null;
self.s.options = _options;

// Update bufferMaxEntries
self.s.storeOptions.bufferMaxEntries = db.bufferMaxEntries;

// Error handler
var connectErrorHandler = function(event) {
return function(err) {
// Remove all event handlers
var events = ['timeout', 'error', 'close'];
events.forEach(function(e) {
self.removeListener(e, connectErrorHandler);
});

// Set up listeners
server.once('timeout', errorHandler('timeout'));
server.once('error', errorHandler('error'));
server.once('close', errorHandler('close'));

// Emit open event
self.emit('open', null, self);
self.s.server.removeListener('connect', connectErrorHandler);

// Return correctly
// Try to callback
try {
callback(null, self);
callback(err);
} catch(err) {
process.nextTick(function() { throw err; })
}
}
}

// Set up listeners
server.once('timeout', connectErrorHandler('timeout'));
server.once('error', connectErrorHandler('error'));
server.once('close', connectErrorHandler('close'));
server.once('connect', connectHandler);
// Reconnect server
server.on('reconnect', reconnectHandler);

// Start connection
server.connect(_options);
}

// Server capabilities
this.capabilities = function() {
if(sCapabilities) return sCapabilities;
sCapabilities = new ServerCapabilities(server.lastIsMaster());
return sCapabilities;
// Actual handler
var errorHandler = function(event) {
return function(err) {
if(event != 'error') {
self.emit(event, err);
}
}
}

// Command
this.command = function(ns, cmd, options, callback) {
server.command(ns, cmd, options, callback);
// Error handler
var reconnectHandler = function(err) {
self.emit('reconnect', self);
self.s.store.execute();
}

// Insert
this.insert = function(ns, ops, options, callback) {
server.insert(ns, ops, options, callback);
}
// Connect handler
var connectHandler = function() {
// Clear out all the current handlers left over
["timeout", "error", "close"].forEach(function(e) {
self.s.server.removeAllListeners(e);
});

// Update
this.update = function(ns, ops, options, callback) {
server.update(ns, ops, options, callback);
// Set up listeners
self.s.server.once('timeout', errorHandler('timeout'));
self.s.server.once('error', errorHandler('error'));
self.s.server.once('close', errorHandler('close'));

// Emit open event
self.emit('open', null, self);

// Return correctly
try {
callback(null, self);
} catch(err) {
process.nextTick(function() { throw err; })
}
}

// Remove
this.remove = function(ns, ops, options, callback) {
server.remove(ns, ops, options, callback);
}
// Set up listeners
self.s.server.once('timeout', connectErrorHandler('timeout'));
self.s.server.once('error', connectErrorHandler('error'));
self.s.server.once('close', connectErrorHandler('close'));
self.s.server.once('connect', connectHandler);
// Reconnect server
self.s.server.on('reconnect', reconnectHandler);

// IsConnected
this.isConnected = function() {
return server.isConnected();
}
// Start connection
self.s.server.connect(_options);
}

// Insert
this.cursor = function(ns, cmd, options) {
options.disconnectHandler = store;
return server.cursor(ns, cmd, options);
}
// Server capabilities
Server.prototype.capabilities = function() {
if(this.s.sCapabilities) return this.s.sCapabilities;
this.s.sCapabilities = new ServerCapabilities(this.s.server.lastIsMaster());
return this.s.sCapabilities;
}

this.setBSONParserType = function(type) {
return server.setBSONParserType(type);
}
// Command
Server.prototype.command = function(ns, cmd, options, callback) {
this.s.server.command(ns, cmd, options, callback);
}

this.lastIsMaster = function() {
return server.lastIsMaster();
}
// Insert
Server.prototype.insert = function(ns, ops, options, callback) {
this.s.server.insert(ns, ops, options, callback);
}

this.close = function(forceClosed) {
server.destroy();
// We need to wash out all stored processes
if(forceClosed == true) {
storeOptions.force = forceClosed;
store.flush();
}
}
// Update
Server.prototype.update = function(ns, ops, options, callback) {
this.s.server.update(ns, ops, options, callback);
}

// Remove
Server.prototype.remove = function(ns, ops, options, callback) {
this.s.server.remove(ns, ops, options, callback);
}

// IsConnected
Server.prototype.isConnected = function() {
return this.s.server.isConnected();
}

// Insert
Server.prototype.cursor = function(ns, cmd, options) {
options.disconnectHandler = this.s.store;
return this.s.server.cursor(ns, cmd, options);
}

Server.prototype.setBSONParserType = function(type) {
return this.s.server.setBSONParserType(type);
}

Server.prototype.lastIsMaster = function() {
return this.s.server.lastIsMaster();
}

this.auth = function() {
var args = Array.prototype.slice.call(arguments, 0);
server.auth.apply(server, args);
Server.prototype.close = function(forceClosed) {
this.s.server.destroy();
// We need to wash out all stored processes
if(forceClosed == true) {
this.s.storeOptions.force = forceClosed;
this.s.store.flush();
}
}

/**
* All raw connections
* @method
* @return {array}
*/
this.connections = function() {
return server.connections();
}
Server.prototype.auth = function() {
var args = Array.prototype.slice.call(arguments, 0);
this.s.server.auth.apply(this.s.server, args);
}

/**
* All raw connections
* @method
* @return {array}
*/
Server.prototype.connections = function() {
return this.s.server.connections();
}

/**
* Server connect event
*
Expand Down Expand Up @@ -345,6 +374,4 @@ var Server = function(host, port, options) {
* @type {object}
*/

inherits(Server, EventEmitter);

module.exports = Server;

0 comments on commit 4fc366c

Please sign in to comment.