Skip to content

New pool #190

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 23, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.neo4j.driver.internal;

import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.Logger;
Expand Down Expand Up @@ -52,7 +53,7 @@ public void run()
};

private InternalTransaction currentTransaction;
private boolean isOpen = true;
private AtomicBoolean isOpen = new AtomicBoolean( true );

public InternalSession( Connection connection, Logger logger )
{
Expand Down Expand Up @@ -100,19 +101,19 @@ public StatementResult run( Statement statement )
@Override
public boolean isOpen()
{
return isOpen;
return isOpen.get();
}

@Override
public void close()
{
if( !isOpen )
// Use atomic operation to protect from closing the connection twice (putting back to the pool twice).
if( !isOpen.compareAndSet( true, false ) )
{
throw new ClientException( "This session has already been closed." );
}
else
{
isOpen = false;
if ( currentTransaction != null )
{
try
Expand All @@ -124,8 +125,14 @@ public void close()
// Best-effort
}
}
connection.sync();
connection.close();
try
{
connection.sync();
}
finally
{
connection.close();
}
}
}

Expand Down Expand Up @@ -171,7 +178,7 @@ private void ensureConnectionIsValidBeforeOpeningTransaction()
@Override
protected void finalize() throws Throwable
{
if( isOpen )
if( isOpen.compareAndSet( true, false ) )
{
logger.error( "Neo4j Session object leaked, please ensure that your application calls the `close` " +
"method on Sessions before disposing of the objects.", null );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,16 @@ public void receiveOne()

@Override
public void close()
{try
{
markAsInUse();
delegate.close();
}
finally
{
markAsAvailable();
}
try
{
markAsInUse();
delegate.close();
}
finally
{
markAsAvailable();
}
}

@Override
Expand Down
41 changes: 0 additions & 41 deletions driver/src/main/java/org/neo4j/driver/internal/pool/Allocator.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@
import java.util.LinkedList;
import java.util.List;
import java.util.ServiceLoader;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.neo4j.driver.internal.connector.socket.SocketConnector;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ConnectionPool;
import org.neo4j.driver.internal.spi.Connector;
import org.neo4j.driver.internal.util.Clock;
import org.neo4j.driver.internal.util.Consumer;
import org.neo4j.driver.v1.AuthToken;
import org.neo4j.driver.v1.Config;
import org.neo4j.driver.v1.exceptions.ClientException;
Expand All @@ -41,16 +42,16 @@
import static java.lang.String.format;

/**
* A basic connection pool that optimizes for threads being long-lived, acquiring/releasing many connections.
* It uses a global queue as a fallback pool, but tries to avoid coordination by storing connections in a ThreadLocal.
* The pool is designed to buffer certain amount of free sessions into session pool. When closing a session, we first
* try to return the session into the session pool, however if we failed to return it back, either because the pool
* is full or the pool is being cleaned on driver.close, then we directly close the connection attached with the
* session.
*
* Safety is achieved by tracking thread locals getting garbage collected, returning connections to the global pool
* when this happens.
* The session is NOT meant to be thread safe, each thread should have an independent session and close it (return to
* pool) when the work with the session has been done.
*
* If threads are long-lived, this pool will achieve linearly scalable performance with overhead equivalent to a
* hash-map lookup per acquire.
*
* If threads are short-lived, this pool is not ideal.
* The driver is thread safe. Each thread could try to get a session from the pool and then return it to the pool
* at the same time.
*/
public class InternalConnectionPool implements ConnectionPool
{
Expand All @@ -62,36 +63,26 @@ public class InternalConnectionPool implements ConnectionPool
/**
* Pools, organized by URL.
*/
private final ConcurrentHashMap<URI,ThreadCachingPool<PooledConnection>> pools = new ConcurrentHashMap<>();

/**
* Connections that fail this criteria will be disposed of.
*/
private final ValidationStrategy<PooledConnection> connectionValidation;
private final ConcurrentHashMap<URI,BlockingQueue<PooledConnection>> pools = new ConcurrentHashMap<>();

private final AuthToken authToken;
/**
* Timeout in milliseconds if there are no available sessions.
*/
private final long acquireSessionTimeout;

private final Clock clock;
private final Config config;

/** Shutdown flag */
private final AtomicBoolean stopped = new AtomicBoolean( false );

public InternalConnectionPool( Config config, AuthToken authToken )
{
this( loadConnectors(), Clock.SYSTEM, config, authToken,
Long.getLong( "neo4j.driver.acquireSessionTimeout", 30_000 ) );
this( loadConnectors(), Clock.SYSTEM, config, authToken);
}

public InternalConnectionPool( Collection<Connector> conns, Clock clock, Config config,
AuthToken authToken, long acquireTimeout )
AuthToken authToken )
{
this.authToken = authToken;
this.acquireSessionTimeout = acquireTimeout;
this.config = config;
this.clock = clock;
this.connectionValidation = new PooledConnectionValidator( config.idleTimeBeforeConnectionTest() );
for ( Connector connector : conns )
{
for ( String s : connector.supportedSchemes() )
Expand All @@ -104,37 +95,37 @@ public InternalConnectionPool( Collection<Connector> conns, Clock clock, Config
@Override
public Connection acquire( URI sessionURI )
{
try
if ( stopped.get() )
{
Connection conn = pool( sessionURI ).acquire( acquireSessionTimeout, TimeUnit.MILLISECONDS );
if ( conn == null )
throw new IllegalStateException( "Pool has been closed, cannot acquire new values." );
}
BlockingQueue<PooledConnection> connections = pool( sessionURI );
PooledConnection conn = connections.poll();
if ( conn == null )
{
Connector connector = connectors.get( sessionURI.getScheme() );
if ( connector == null )
{
throw new ClientException(
"Failed to acquire a session with Neo4j " +
"as all the connections in the connection pool are already occupied by other sessions. " +
"Please close unused session and retry. " +
"Current Pool size: " + config.connectionPoolSize() +
". If your application requires running more sessions concurrently than the current pool " +
"size, you should create a driver with a larger connection pool size." );
format( "Unsupported URI scheme: '%s' in url: '%s'. Supported transports are: '%s'.",
sessionURI.getScheme(), sessionURI, connectorSchemes() ) );
}
return conn;
}
catch ( InterruptedException e )
{
throw new ClientException( "Interrupted while waiting for a connection to Neo4j." );
conn = new PooledConnection(connector.connect( sessionURI, config, authToken ), new
PooledConnectionReleaseConsumer( connections, stopped, config ), clock);
}
conn.updateUsageTimestamp();
return conn;
}

private ThreadCachingPool<PooledConnection> pool( URI sessionURI )
private BlockingQueue<PooledConnection> pool( URI sessionURI )
{
ThreadCachingPool<PooledConnection> pool = pools.get( sessionURI );
BlockingQueue<PooledConnection> pool = pools.get( sessionURI );
if ( pool == null )
{
pool = newPool( sessionURI );
pool = new LinkedBlockingQueue<>(config.maxIdleConnectionPoolSize());
if ( pools.putIfAbsent( sessionURI, pool ) != null )
{
// We lost a race to create the pool, dispose of the one we created, and recurse
pool.close();
return pool( sessionURI );
}
}
Expand All @@ -161,48 +152,30 @@ private static Collection<Connector> loadConnectors()
@Override
public void close() throws Neo4jException
{
for ( ThreadCachingPool<PooledConnection> pool : pools.values() )
if( !stopped.compareAndSet( false, true ) )
{
pool.close();
// already closed or some other thread already started close
return;
}
pools.clear();
}

private String connectorSchemes()
{
return Arrays.toString( connectors.keySet().toArray( new String[connectors.keySet().size()] ) );
}

private ThreadCachingPool<PooledConnection> newPool( final URI uri )
{

return new ThreadCachingPool<>( config.connectionPoolSize(), new Allocator<PooledConnection>()
for ( BlockingQueue<PooledConnection> pool : pools.values() )
{
@Override
public PooledConnection allocate( Consumer<PooledConnection> release )
while ( !pool.isEmpty() )
{
Connector connector = connectors.get( uri.getScheme() );
if ( connector == null )
PooledConnection conn = pool.poll();
if ( conn != null )
{
throw new ClientException(
format( "Unsupported URI scheme: '%s' in url: '%s'. Supported transports are: '%s'.",
uri.getScheme(), uri, connectorSchemes() ) );
//close the underlying connection without adding it back to the queue
conn.dispose();
}
Connection conn = connector.connect( uri, config, authToken );
return new PooledConnection( conn, release );
}

@Override
public void onDispose( PooledConnection pooledConnection )
{
pooledConnection.dispose();
}
}

@Override
public void onAcquire( PooledConnection pooledConnection )
{
pools.clear();
}

}
}, connectionValidation, clock );
private String connectorSchemes()
{
return Arrays.toString( connectors.keySet().toArray( new String[connectors.keySet().size()] ) );
}
}
Loading