-
Notifications
You must be signed in to change notification settings - Fork 155
Close all connections on driver.close() #252
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
/** | ||
* Copyright (c) 2002-2016 "Neo Technology," | ||
* Network Engine for Objects in Lund AB [http://neotechnology.com] | ||
* | ||
* This file is part of Neo4j. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.neo4j.driver.internal.net.pooling; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.concurrent.BlockingQueue; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.LinkedBlockingQueue; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
import org.neo4j.driver.internal.util.Supplier; | ||
|
||
/** | ||
* A blocking queue that also keeps track of connections that are acquired in order | ||
* to facilitate termination of all connections. | ||
*/ | ||
public class BlockingPooledConnectionQueue | ||
{ | ||
/** The backing queue, keeps track of connections currently in queue */ | ||
private final BlockingQueue<PooledConnection> queue; | ||
|
||
private final AtomicBoolean isTerminating = new AtomicBoolean( false ); | ||
|
||
/** Keeps track of acquired connections */ | ||
private final Set<PooledConnection> acquiredConnections = | ||
Collections.newSetFromMap(new ConcurrentHashMap<PooledConnection, Boolean>()); | ||
|
||
public BlockingPooledConnectionQueue( int capacity ) | ||
{ | ||
this.queue = new LinkedBlockingQueue<>( capacity ); | ||
} | ||
|
||
/** | ||
* Offer a connections back to the queue | ||
* | ||
* @param pooledConnection the connection to put back to the queue | ||
* @return <code>true</code> if connections was accepted otherwise <code>false</code> | ||
*/ | ||
public boolean offer( PooledConnection pooledConnection ) | ||
{ | ||
acquiredConnections.remove( pooledConnection ); | ||
boolean offer = queue.offer( pooledConnection ); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it necessary to synchronize here, i.e. making the move from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The order should be first remove from NOTE: Remove first, then add back. This rule applies to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, whenever you add into the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ref: https://github.com/neo4j/neo4j-dotnet-driver/blob/1.1/Neo4j.Driver/Neo4j.Driver/Internal/ConnectionPool.cs |
||
// not added back to the queue, dispose of the connection | ||
if (!offer) { | ||
pooledConnection.dispose(); | ||
} | ||
if (isTerminating.get()) { | ||
PooledConnection poll = queue.poll(); | ||
if (poll != null) | ||
{ | ||
poll.dispose(); | ||
} | ||
} | ||
return offer; | ||
} | ||
|
||
/** | ||
* Acquire connection or create a new one if the queue is empty | ||
* @param supplier used to create a new connection if queue is empty | ||
* @return a PooledConnection instance | ||
*/ | ||
public PooledConnection acquire( Supplier<PooledConnection> supplier ) | ||
{ | ||
|
||
PooledConnection poll = queue.poll(); | ||
if ( poll == null ) | ||
{ | ||
poll = supplier.get(); | ||
} | ||
acquiredConnections.add( poll ); | ||
|
||
if (isTerminating.get()) { | ||
acquiredConnections.remove( poll ); | ||
poll.dispose(); | ||
throw new IllegalStateException( "Pool has been closed, cannot acquire new values." ); | ||
} | ||
return poll; | ||
} | ||
|
||
public List<PooledConnection> toList() | ||
{ | ||
return new ArrayList<>( queue ); | ||
} | ||
|
||
public boolean isEmpty() | ||
{ | ||
return queue.isEmpty(); | ||
} | ||
|
||
public int size() | ||
{ | ||
return queue.size(); | ||
} | ||
|
||
public boolean contains( PooledConnection pooledConnection ) | ||
{ | ||
return queue.contains( pooledConnection ); | ||
} | ||
|
||
/** | ||
* Terminates all connections, both those that are currently in the queue as well | ||
* as those that have been acquired. | ||
*/ | ||
public void terminate() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You probably want a atomic var here at the very beginning of the method, to flag it that we've started termination. If any connection created after this, close it right now. If any connection want to add to the queue or map, drop them directly. |
||
{ | ||
if (isTerminating.compareAndSet( false, true )) | ||
{ | ||
while ( !queue.isEmpty() ) | ||
{ | ||
PooledConnection conn = queue.poll(); | ||
if ( conn != null ) | ||
{ | ||
//close the underlying connection without adding it back to the queue | ||
conn.dispose(); | ||
} | ||
} | ||
for ( PooledConnection pooledConnection : acquiredConnections ) | ||
{ | ||
pooledConnection.dispose(); | ||
} | ||
} | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if we should add connection.isOpen test here. Though it is handle, however this method before is just to test if
session.close
is ever called. If you want to change this toif this session could run more queries
or something else, then make sure that other methods who calling thisisOpen
is not broken.