Skip to content

ReplicaSetStatus: added feature to remove nodes dynamically #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions src/main/com/mongodb/DBApiLayer.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,18 +188,28 @@ void _cleanCursors()
throws MongoException {

List<Long> l = null;

// check without synchronisation ( double check pattern will avoid having two threads do the cleanup )
// maybe the whole cleanCursor logic should be moved to a background thread anyway
int sz = _deadCursorIds.size();

if ( sz == 0 )
return;

if ( sz % 20 != 0 && sz < NUM_CURSORS_BEFORE_KILL )
return;

synchronized ( _deadCursorIdsLock ){
int sz = _deadCursorIds.size();
sz = _deadCursorIds.size();

if ( _deadCursorIds.size() == 0 )
if ( sz == 0 )
return;

if ( sz % 20 != 0 && sz < NUM_CURSORS_BEFORE_KILL )
return;

l = _deadCursorIds;
_deadCursorIds = new Vector<Long>();
_deadCursorIds = new LinkedList<Long>(); // replaced Vector by LinkedList because Vector is synchronized ( which should not be necessary in our current use cases )
}

Bytes.LOGGER.info( "going to kill cursors : " + l.size() );
Expand Down
1 change: 1 addition & 0 deletions src/main/com/mongodb/DBCursor.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public DBCursor copy() {
c._numWanted = _numWanted;
c._skip = _skip;
c._options = _options;
c._batchSize = _batchSize;
if ( _specialFields != null )
c._specialFields = new BasicDBObject( _specialFields.toMap() );
return c;
Expand Down
34 changes: 33 additions & 1 deletion src/main/com/mongodb/DBPort.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ void say( OutMessage msg )

private synchronized Response go( OutMessage msg , DBCollection coll )
throws IOException {
return go( msg , coll , false );
}

private synchronized Response go( OutMessage msg , DBCollection coll , boolean forceReponse )
throws IOException {

if ( _processingResponse ){
if ( coll == null ){
Expand All @@ -88,7 +93,7 @@ private synchronized Response go( OutMessage msg , DBCollection coll )
if ( _pool != null )
_pool._everWorked = true;

if ( coll == null )
if ( coll == null && ! forceReponse )
return null;

_processingResponse = true;
Expand Down Expand Up @@ -132,6 +137,33 @@ synchronized CommandResult runCommand( DB db , DBObject cmd ) {
return (CommandResult)res;
}

synchronized DBObject findOne( String ns , DBObject q ){
OutMessage msg = OutMessage.query( null , 0 , ns , 0 , -1 , q , null );

try {
Response res = go( msg , null , true );
if ( res.size() == 0 )
return null;
if ( res.size() > 1 )
throw new MongoInternalException( "something is wrong. size:" + res.size() );
return res.get(0);
}
catch ( IOException ioe ){
throw new MongoInternalException( "DBPort.findOne failed" , ioe );
}

}

synchronized CommandResult runCommand( String db , DBObject cmd ) {
DBObject res = findOne( db + ".$cmd" , cmd );
if ( res == null )
throw new MongoInternalException( "something is wrong, no command result" );
CommandResult cr = new CommandResult();
cr.putAll( res );
return cr;
}


synchronized CommandResult tryGetLastError( DB db , long last, WriteConcern concern){
if ( last != _calls )
return null;
Expand Down
7 changes: 2 additions & 5 deletions src/main/com/mongodb/DBTCPConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,14 @@ class DBTCPConnector implements DBConnector {

public DBTCPConnector( Mongo m , ServerAddress addr )
throws MongoException {
_mongo = m;
_portHolder = new DBPortPool.Holder( m._options );
_checkAddress( addr );

_createLogger.info( addr.toString() );

if ( addr.isPaired() ){
_allHosts = new ArrayList<ServerAddress>( addr.explode() );
_rsStatus = new ReplicaSetStatus( m , _allHosts , this );
_rsStatus = new ReplicaSetStatus( _allHosts );
_createLogger.info( "switching to replica set mode : " + _allHosts + " -> " + _curMaster );
}
else {
Expand All @@ -58,12 +57,11 @@ public DBTCPConnector( Mongo m , ServerAddress ... all )

public DBTCPConnector( Mongo m , List<ServerAddress> all )
throws MongoException {
_mongo = m;
_portHolder = new DBPortPool.Holder( m._options );
_checkAddress( all );

_allHosts = new ArrayList<ServerAddress>( all ); // make a copy so it can't be modified
_rsStatus = new ReplicaSetStatus( m , _allHosts , this );
_rsStatus = new ReplicaSetStatus( _allHosts );

_createLogger.info( all + " -> " + _curMaster );
}
Expand Down Expand Up @@ -353,7 +351,6 @@ public void close(){
_rsStatus.close();
}

final Mongo _mongo;
private ServerAddress _curMaster;
private DBPortPool _curPortPool;
private DBPortPool.Holder _portHolder;
Expand Down
95 changes: 69 additions & 26 deletions src/main/com/mongodb/Mongo.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,31 +77,6 @@ public static DB connect( DBAddress addr ){
return new Mongo( addr ).getDB( addr.getDBName() );
}

public static Mongo getStaticMongo( String host )
throws UnknownHostException , MongoException {
return getStaticMongo( host , null );
}

private static final MongoOptions _defaultOptions = new MongoOptions();

public static Mongo getStaticMongo( String host , MongoOptions options )
throws UnknownHostException , MongoException {

final String key = host + "-" + options;

Mongo m = _mongos.get( key );
if ( m != null )
return m;

m = new Mongo( host , options == null ? _defaultOptions : options );
Mongo temp = _mongos.putIfAbsent( key , m );
if ( temp != null ){
m.close();
return temp;
}
return m;
}

public Mongo()
throws UnknownHostException , MongoException {
this( new ServerAddress() );
Expand Down Expand Up @@ -215,6 +190,30 @@ public Mongo( List<ServerAddress> replicaSetSeeds , MongoOptions options )
_connector.checkMaster();
}

public Mongo( MongoURI uri )
throws MongoException , UnknownHostException {

_options = uri.getOptions();

if ( uri.getHosts().size() == 1 ){
_addr = new ServerAddress( uri.getHosts().get(0) );
_addrs = null;
_connector = new DBTCPConnector( this , _addr );
}
else {
List<ServerAddress> replicaSetSeeds = new ArrayList<ServerAddress>( uri.getHosts().size() );
for ( String host : uri.getHosts() )
replicaSetSeeds.add( new ServerAddress( host ) );
_addr = null;
_addrs = replicaSetSeeds;
_connector = new DBTCPConnector( this , replicaSetSeeds );
}

_connector.checkMaster();


}

public DB getDB( String dbname ){

DB db = _dbs.get( dbname );
Expand Down Expand Up @@ -357,5 +356,49 @@ protected PoolOutputBuffer createNew(){
};


private static final ConcurrentMap<String,Mongo> _mongos = new ConcurrentHashMap<String,Mongo>();
// -------


/**
* Mongo.Holder is if you want to have a static place to hold instances of Mongo
* security is not enforced at this level, so need to do on your side
*/
public static class Holder {

public Mongo connect( MongoURI uri )
throws MongoException , UnknownHostException {

String key = _toKey( uri );

Mongo m = _mongos.get(key);
if ( m != null )
return m;

m = new Mongo( uri );

Mongo temp = _mongos.putIfAbsent( key , m );
if ( temp == null ){
// ours got in
return m;
}

// there was a race and we lost
// close ours and return the other one
m.close();
return temp;
}

String _toKey( MongoURI uri ){
StringBuilder buf = new StringBuilder();
for ( String h : uri.getHosts() )
buf.append( h ).append( "," );
buf.append( uri.getOptions() );
buf.append( uri.getUsername() );
return buf.toString();
}


private static final ConcurrentMap<String,Mongo> _mongos = new ConcurrentHashMap<String,Mongo>();

}
}
32 changes: 32 additions & 0 deletions src/main/com/mongodb/MongoURI.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

package com.mongodb;

import java.net.*;
import java.util.*;

public class MongoURI {
Expand Down Expand Up @@ -112,11 +113,42 @@ public String getCollection(){
return _collection;
}

public MongoOptions getOptions(){
return _options;
}

public Mongo connect()
throws MongoException , UnknownHostException {
// TODO caching?
return new Mongo( this );
}

public DB connectDB()
throws MongoException , UnknownHostException {
// TODO auth
return connect().getDB( _database );
}

public DB connectDB( Mongo m ){
// TODO auth
return m.getDB( _database );
}

public DBCollection connectCollection( DB db ){
return db.getCollection( _collection );
}

public DBCollection connectCollection( Mongo m ){
return connectDB( m ).getCollection( _collection );
}

// ---------------------------------

final String _username;
final char[] _password;
final List<String> _hosts;
final String _database;
final String _collection;

final MongoOptions _options = new MongoOptions();
}
10 changes: 5 additions & 5 deletions src/main/com/mongodb/OutMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ static OutMessage query( Mongo m , int options , String ns , int numToSkip , int

OutMessage( Mongo m ){
_mongo = m;
_buffer = m._bufferPool.get();
_buffer = _mongo == null ? new PoolOutputBuffer() : _mongo._bufferPool.get();
set( _buffer );
}

Expand Down Expand Up @@ -166,11 +166,11 @@ byte[] toByteArray(){
}

void doneWithMessage(){
if ( _buffer != null ){
if ( _buffer != null && _mongo != null )
_mongo._bufferPool.done( _buffer );
_buffer = null;
_mongo = null;
}

_buffer = null;
_mongo = null;
}

boolean hasOption( int option ){
Expand Down
Loading