Skip to content

Commit 8cfe635

Browse files
committed
Close all connections on driver.close()
Connections/Sessions that were in flight, i.e. acquired from the pool, were not closed when shutting down the driver, connections remained open until GC happened. The expected behavior is that `driver.close()` closes down all sessions it has created.
1 parent b282660 commit 8cfe635

File tree

8 files changed

+320
-83
lines changed

8 files changed

+320
-83
lines changed

driver/src/main/java/org/neo4j/driver/internal/NetworkSession.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ public synchronized void reset()
139139
@Override
140140
public boolean isOpen()
141141
{
142-
return isOpen.get();
142+
return isOpen.get() && connection.isOpen();
143143
}
144144

145145
@Override
@@ -177,10 +177,6 @@ public void close()
177177
{
178178
connection.sync();
179179
}
180-
catch ( Throwable t )
181-
{
182-
throw t;
183-
}
184180
finally
185181
{
186182
closeConnection();
@@ -314,7 +310,7 @@ private void ensureConnectionIsOpen()
314310

315311
private void ensureSessionIsOpen()
316312
{
317-
if ( !isOpen() )
313+
if ( !isOpen.get() )
318314
{
319315
throw new ClientException(
320316
"No more interaction with this session is allowed " +
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/**
2+
* Copyright (c) 2002-2016 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.net.pooling;
20+
21+
import java.util.ArrayList;
22+
import java.util.Collections;
23+
import java.util.List;
24+
import java.util.Set;
25+
import java.util.concurrent.BlockingQueue;
26+
import java.util.concurrent.ConcurrentHashMap;
27+
import java.util.concurrent.LinkedBlockingQueue;
28+
29+
import org.neo4j.driver.internal.util.Supplier;
30+
31+
/**
32+
* A blocking queue that also keeps track of connections that are acquired in order
33+
* to facilitate termination of all connections.
34+
*/
35+
public class BlockingPooledConnectionQueue
36+
{
37+
/** The backing queue, keeps track of connections currently in queue */
38+
private final BlockingQueue<PooledConnection> queue;
39+
40+
/** Keeps track of acquired connections */
41+
private final Set<PooledConnection> acquiredConnections =
42+
Collections.newSetFromMap(new ConcurrentHashMap<PooledConnection, Boolean>());
43+
44+
public BlockingPooledConnectionQueue( int capacity )
45+
{
46+
this.queue = new LinkedBlockingQueue<>( capacity );
47+
}
48+
49+
/**
50+
* Offer a connections back to the queue
51+
*
52+
* @param pooledConnection the connection to put back to the queue
53+
* @return <code>true</code> if connections was accepted otherwise <code>false</code>
54+
*/
55+
public boolean offer( PooledConnection pooledConnection )
56+
{
57+
boolean offer = queue.offer( pooledConnection );
58+
if ( offer )
59+
{
60+
acquiredConnections.remove( pooledConnection );
61+
}
62+
return offer;
63+
}
64+
65+
public List<PooledConnection> toList()
66+
{
67+
return new ArrayList<>( queue );
68+
}
69+
70+
public boolean isEmpty()
71+
{
72+
return queue.isEmpty();
73+
}
74+
75+
public int size()
76+
{
77+
return queue.size();
78+
}
79+
80+
public boolean contains( PooledConnection pooledConnection )
81+
{
82+
return queue.contains( pooledConnection );
83+
}
84+
85+
/**
86+
* Terminates all connections, both those that are currently in the queue as well
87+
* as those that have been acquired.
88+
*/
89+
public void terminate()
90+
{
91+
while ( !queue.isEmpty() )
92+
{
93+
PooledConnection conn = queue.poll();
94+
if ( conn != null )
95+
{
96+
//close the underlying connection without adding it back to the queue
97+
conn.dispose();
98+
}
99+
}
100+
for ( PooledConnection pooledConnection : acquiredConnections )
101+
{
102+
pooledConnection.dispose();
103+
}
104+
}
105+
106+
/**
107+
* Acquire connection or create a new one if the queue is empty
108+
* @param supplier used to create a new connection if queue is empty
109+
* @return a PooledConnection instance
110+
*/
111+
public PooledConnection acquire( Supplier<PooledConnection> supplier )
112+
{
113+
PooledConnection poll = queue.poll();
114+
if ( poll == null )
115+
{
116+
poll = supplier.get();
117+
}
118+
acquiredConnections.add( poll );
119+
return poll;
120+
}
121+
}

driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledConnectionReleaseConsumer.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.neo4j.driver.internal.net.pooling;
2020

21-
import java.util.concurrent.BlockingQueue;
2221
import java.util.concurrent.atomic.AtomicBoolean;
2322

2423
import org.neo4j.driver.internal.util.Consumer;
@@ -30,11 +29,11 @@
3029
*/
3130
class PooledConnectionReleaseConsumer implements Consumer<PooledConnection>
3231
{
33-
private final BlockingQueue<PooledConnection> connections;
32+
private final BlockingPooledConnectionQueue connections;
3433
private final AtomicBoolean driverStopped;
3534
private final Function<PooledConnection, Boolean> validConnection;
3635

37-
PooledConnectionReleaseConsumer( BlockingQueue<PooledConnection> connections, AtomicBoolean driverStopped,
36+
PooledConnectionReleaseConsumer( BlockingPooledConnectionQueue connections, AtomicBoolean driverStopped,
3837
Function<PooledConnection, Boolean> validConnection)
3938
{
4039
this.connections = connections;
@@ -67,11 +66,7 @@ else if ( driverStopped.get() )
6766
// which connection we get back, because other threads might be in the same situation as ours. It only
6867
// matters that we added *a* connection that might not be observed by the loop, and that we dispose of
6968
// *a* connection in response.
70-
PooledConnection conn = connections.poll();
71-
if ( conn != null )
72-
{
73-
conn.dispose();
74-
}
69+
connections.terminate();
7570
}
7671
}
7772
else

driver/src/main/java/org/neo4j/driver/internal/net/pooling/SocketConnectionPool.java

Lines changed: 37 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,9 @@
1818
*/
1919
package org.neo4j.driver.internal.net.pooling;
2020

21-
import java.util.ArrayList;
2221
import java.util.List;
2322
import java.util.Map;
24-
import java.util.concurrent.BlockingQueue;
2523
import java.util.concurrent.ConcurrentHashMap;
26-
import java.util.concurrent.LinkedBlockingQueue;
2724
import java.util.concurrent.atomic.AtomicBoolean;
2825

2926
import org.neo4j.driver.internal.ConnectionSettings;
@@ -35,6 +32,7 @@
3532
import org.neo4j.driver.internal.spi.Connection;
3633
import org.neo4j.driver.internal.spi.ConnectionPool;
3734
import org.neo4j.driver.internal.util.Clock;
35+
import org.neo4j.driver.internal.util.Supplier;
3836
import org.neo4j.driver.v1.AuthToken;
3937
import org.neo4j.driver.v1.AuthTokens;
4038
import org.neo4j.driver.v1.Logging;
@@ -48,10 +46,10 @@
4846
* try to return the session into the session pool, however if we failed to return it back, either because the pool
4947
* is full or the pool is being cleaned on driver.close, then we directly close the connection attached with the
5048
* session.
51-
*
49+
* <p>
5250
* The session is NOT meant to be thread safe, each thread should have an independent session and close it (return to
5351
* pool) when the work with the session has been done.
54-
*
52+
* <p>
5553
* The driver is thread safe. Each thread could try to get a session from the pool and then return it to the pool
5654
* at the same time.
5755
*/
@@ -60,7 +58,8 @@ public class SocketConnectionPool implements ConnectionPool
6058
/**
6159
* Pools, organized by server address.
6260
*/
63-
private final ConcurrentHashMap<BoltServerAddress,BlockingQueue<PooledConnection>> pools = new ConcurrentHashMap<>();
61+
private final ConcurrentHashMap<BoltServerAddress,BlockingPooledConnectionQueue> pools =
62+
new ConcurrentHashMap<>();
6463

6564
private final Clock clock = Clock.SYSTEM;
6665

@@ -73,7 +72,7 @@ public class SocketConnectionPool implements ConnectionPool
7372
private final AtomicBoolean stopped = new AtomicBoolean( false );
7473

7574
public SocketConnectionPool( ConnectionSettings connectionSettings, SecurityPlan securityPlan,
76-
PoolSettings poolSettings, Logging logging )
75+
PoolSettings poolSettings, Logging logging )
7776
{
7877
this.connectionSettings = connectionSettings;
7978
this.securityPlan = securityPlan;
@@ -94,41 +93,48 @@ private Connection connect( BoltServerAddress address ) throws ClientException
9493

9594
private static Map<String,Value> tokenAsMap( AuthToken token )
9695
{
97-
if( token instanceof InternalAuthToken )
96+
if ( token instanceof InternalAuthToken )
9897
{
9998
return ((InternalAuthToken) token).toMap();
10099
}
101100
else
102101
{
103-
throw new ClientException( "Unknown authentication token, `" + token + "`. Please use one of the supported " +
102+
throw new ClientException(
103+
"Unknown authentication token, `" + token + "`. Please use one of the supported " +
104104
"tokens from `" + AuthTokens.class.getSimpleName() + "`." );
105105
}
106106
}
107107

108108
@Override
109-
public Connection acquire( BoltServerAddress address )
109+
public Connection acquire( final BoltServerAddress address )
110110
{
111111
if ( stopped.get() )
112112
{
113113
throw new IllegalStateException( "Pool has been closed, cannot acquire new values." );
114114
}
115-
BlockingQueue<PooledConnection> connections = pool( address );
116-
PooledConnection conn = connections.poll();
117-
if ( conn == null )
115+
final BlockingPooledConnectionQueue connections = pool( address );
116+
Supplier<PooledConnection> supplier = new Supplier<PooledConnection>()
118117
{
119-
conn = new PooledConnection( connect( address ), new
120-
PooledConnectionReleaseConsumer( connections, stopped, new PooledConnectionValidator( this, poolSettings ) ), clock );
121-
}
118+
@Override
119+
public PooledConnection get()
120+
{
121+
return new PooledConnection( connect( address ), new
122+
PooledConnectionReleaseConsumer( connections, stopped,
123+
new PooledConnectionValidator( SocketConnectionPool.this, poolSettings ) ), clock );
124+
125+
}
126+
};
127+
PooledConnection conn = connections.acquire( supplier );
122128
conn.updateTimestamp();
123129
return conn;
124130
}
125131

126-
private BlockingQueue<PooledConnection> pool( BoltServerAddress address )
132+
private BlockingPooledConnectionQueue pool( BoltServerAddress address )
127133
{
128-
BlockingQueue<PooledConnection> pool = pools.get( address );
134+
BlockingPooledConnectionQueue pool = pools.get( address );
129135
if ( pool == null )
130136
{
131-
pool = new LinkedBlockingQueue<>(poolSettings.maxIdleConnectionPoolSize());
137+
pool = new BlockingPooledConnectionQueue( poolSettings.maxIdleConnectionPoolSize() );
132138

133139
if ( pools.putIfAbsent( address, pool ) != null )
134140
{
@@ -142,19 +148,13 @@ private BlockingQueue<PooledConnection> pool( BoltServerAddress address )
142148
@Override
143149
public void purge( BoltServerAddress address )
144150
{
145-
BlockingQueue<PooledConnection> connections = pools.remove( address );
151+
BlockingPooledConnectionQueue connections = pools.remove( address );
146152
if ( connections == null )
147153
{
148154
return;
149155
}
150-
while (!connections.isEmpty())
151-
{
152-
PooledConnection connection = connections.poll();
153-
if ( connection != null)
154-
{
155-
connection.dispose();
156-
}
157-
}
156+
157+
connections.terminate();
158158
}
159159

160160
@Override
@@ -166,41 +166,34 @@ public boolean hasAddress( BoltServerAddress address )
166166
@Override
167167
public void close()
168168
{
169-
if( !stopped.compareAndSet( false, true ) )
169+
if ( !stopped.compareAndSet( false, true ) )
170170
{
171171
// already closed or some other thread already started close
172172
return;
173173
}
174174

175-
for ( BlockingQueue<PooledConnection> pool : pools.values() )
175+
for ( BlockingPooledConnectionQueue pool : pools.values() )
176176
{
177-
while ( !pool.isEmpty() )
178-
{
179-
PooledConnection conn = pool.poll();
180-
if ( conn != null )
181-
{
182-
//close the underlying connection without adding it back to the queue
183-
conn.dispose();
184-
}
185-
}
177+
pool.terminate();
186178
}
187179

188180
pools.clear();
189181
}
190182

183+
191184
//for testing
192-
public List<PooledConnection> connectionsForAddress(BoltServerAddress address)
185+
public List<PooledConnection> connectionsForAddress( BoltServerAddress address )
193186
{
194-
LinkedBlockingQueue<PooledConnection> pooledConnections =
195-
(LinkedBlockingQueue<PooledConnection>) pools.get( address );
196-
if (pooledConnections == null)
187+
BlockingPooledConnectionQueue pooledConnections = pools.get( address );
188+
if ( pooledConnections == null )
197189
{
198190
return emptyList();
199191
}
200192
else
201193
{
202-
return new ArrayList<>( pooledConnections );
194+
return pooledConnections.toList();
203195
}
204196
}
205197

198+
206199
}

0 commit comments

Comments
 (0)