Skip to content

Close all connections on driver.close() #252

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 24, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public synchronized void reset()
@Override
public boolean isOpen()
{
return isOpen.get();
return isOpen.get() && connection.isOpen();
Copy link
Contributor

@zhenlineo zhenlineo Oct 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if we should add connection.isOpen test here. Though it is handle, however this method before is just to test if session.close is ever called. If you want to change this to if this session could run more queries or something else, then make sure that other methods who calling this isOpen is not broken.

}

@Override
Expand Down Expand Up @@ -177,10 +177,6 @@ public void close()
{
connection.sync();
}
catch ( Throwable t )
{
throw t;
}
finally
{
closeConnection();
Expand Down Expand Up @@ -314,7 +310,7 @@ private void ensureConnectionIsOpen()

private void ensureSessionIsOpen()
{
if ( !isOpen() )
if ( !isOpen.get() )
{
throw new ClientException(
"No more interaction with this session is allowed " +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/**
* Copyright (c) 2002-2016 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.net.pooling;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.neo4j.driver.internal.util.Supplier;

/**
* A blocking queue that also keeps track of connections that are acquired in order
* to facilitate termination of all connections.
*/
public class BlockingPooledConnectionQueue
{
/** The backing queue, keeps track of connections currently in queue */
private final BlockingQueue<PooledConnection> queue;

private final AtomicBoolean isTerminating = new AtomicBoolean( false );

/** Keeps track of acquired connections */
private final Set<PooledConnection> acquiredConnections =
Collections.newSetFromMap(new ConcurrentHashMap<PooledConnection, Boolean>());

public BlockingPooledConnectionQueue( int capacity )
{
this.queue = new LinkedBlockingQueue<>( capacity );
}

/**
* Offer a connections back to the queue
*
* @param pooledConnection the connection to put back to the queue
* @return <code>true</code> if connections was accepted otherwise <code>false</code>
*/
public boolean offer( PooledConnection pooledConnection )
{
acquiredConnections.remove( pooledConnection );
boolean offer = queue.offer( pooledConnection );
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to synchronize here, i.e. making the move from queue to acquiredConnections atomic? It feels like you should but I am failing to come up with a scenario where it matters, so I opted for less synchronization.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order should be first remove from acquiredConn, then put back to queue to avid the following scenario happening:
T1: queue.offer(connA)
T2: acquire() -> get connA // connA = queue.poll, acquiredConn.Add(connA)
T1: acquiredConn.remove(connA)

NOTE: Remove first, then add back. This rule applies to acquire too

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, whenever you add into the queue/acquiredConn, you probably want to test if the pool already terminated. if it is already terminated, then drop the conn directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ref: https://github.com/neo4j/neo4j-dotnet-driver/blob/1.1/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs
We used lock because we do not have a concurrent container before. We probably could have a lock free impl for java driver

// not added back to the queue, dispose of the connection
if (!offer) {
pooledConnection.dispose();
}
if (isTerminating.get()) {
PooledConnection poll = queue.poll();
if (poll != null)
{
poll.dispose();
}
}
return offer;
}

/**
* Acquire connection or create a new one if the queue is empty
* @param supplier used to create a new connection if queue is empty
* @return a PooledConnection instance
*/
public PooledConnection acquire( Supplier<PooledConnection> supplier )
{

PooledConnection poll = queue.poll();
if ( poll == null )
{
poll = supplier.get();
}
acquiredConnections.add( poll );

if (isTerminating.get()) {
acquiredConnections.remove( poll );
poll.dispose();
throw new IllegalStateException( "Pool has been closed, cannot acquire new values." );
}
return poll;
}

public List<PooledConnection> toList()
{
return new ArrayList<>( queue );
}

public boolean isEmpty()
{
return queue.isEmpty();
}

public int size()
{
return queue.size();
}

public boolean contains( PooledConnection pooledConnection )
{
return queue.contains( pooledConnection );
}

/**
* Terminates all connections, both those that are currently in the queue as well
* as those that have been acquired.
*/
public void terminate()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably want a atomic var here at the very beginning of the method, to flag it that we've started termination. If any connection created after this, close it right now. If any connection want to add to the queue or map, drop them directly.

{
if (isTerminating.compareAndSet( false, true ))
{
while ( !queue.isEmpty() )
{
PooledConnection conn = queue.poll();
if ( conn != null )
{
//close the underlying connection without adding it back to the queue
conn.dispose();
}
}
for ( PooledConnection pooledConnection : acquiredConnections )
{
pooledConnection.dispose();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.neo4j.driver.internal.net.pooling;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

import org.neo4j.driver.internal.util.Consumer;
Expand All @@ -30,49 +29,22 @@
*/
class PooledConnectionReleaseConsumer implements Consumer<PooledConnection>
{
private final BlockingQueue<PooledConnection> connections;
private final AtomicBoolean driverStopped;
private final BlockingPooledConnectionQueue connections;
private final Function<PooledConnection, Boolean> validConnection;

PooledConnectionReleaseConsumer( BlockingQueue<PooledConnection> connections, AtomicBoolean driverStopped,
PooledConnectionReleaseConsumer( BlockingPooledConnectionQueue connections,
Function<PooledConnection, Boolean> validConnection)
{
this.connections = connections;
this.driverStopped = driverStopped;
this.validConnection = validConnection;
}

@Override
public void accept( PooledConnection pooledConnection )
{
if( driverStopped.get() )
if ( validConnection.apply( pooledConnection ) )
{
// if the driver already closed, then no need to try to return to pool, just directly close this connection
pooledConnection.dispose();
}
else if ( validConnection.apply( pooledConnection ) )
{
boolean released = connections.offer( pooledConnection );
if( !released )
{
// if the connection could be put back to the pool, then we let the pool to manage it.
// Otherwise, we close the connection directly here.
pooledConnection.dispose();
}
else if ( driverStopped.get() )
{
// If our adding the pooledConnection to the queue was racing with the closing of the driver,
// then the loop where the driver is closing all available connections might not observe our newly
// added connection. Thus, we must attempt to remove a connection and dispose it. It doesn't matter
// which connection we get back, because other threads might be in the same situation as ours. It only
// matters that we added *a* connection that might not be observed by the loop, and that we dispose of
// *a* connection in response.
PooledConnection conn = connections.poll();
if ( conn != null )
{
conn.dispose();
}
}
connections.offer( pooledConnection );
}
else
{
Expand Down
Loading