Skip to content

Commit a9ed77e

Browse files
authored
Merge pull request #387 from lutovich/1.5-close-broken-idle-conns
Improve disposal of broken connections
2 parents dbc4912 + 6efeb84 commit a9ed77e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1562
-1122
lines changed

driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@
2222
import java.net.URI;
2323
import java.security.GeneralSecurityException;
2424

25-
import org.neo4j.driver.internal.cluster.LoadBalancer;
2625
import org.neo4j.driver.internal.cluster.RoutingContext;
2726
import org.neo4j.driver.internal.cluster.RoutingSettings;
27+
import org.neo4j.driver.internal.cluster.loadbalancing.LeastConnectedLoadBalancingStrategy;
28+
import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancer;
29+
import org.neo4j.driver.internal.cluster.loadbalancing.LoadBalancingStrategy;
30+
import org.neo4j.driver.internal.cluster.loadbalancing.RoundRobinLoadBalancingStrategy;
2831
import org.neo4j.driver.internal.net.BoltServerAddress;
2932
import org.neo4j.driver.internal.net.SocketConnector;
3033
import org.neo4j.driver.internal.net.pooling.PoolSettings;
@@ -146,7 +149,21 @@ protected InternalDriver createDriver( Config config, SecurityPlan securityPlan,
146149
protected LoadBalancer createLoadBalancer( BoltServerAddress address, ConnectionPool connectionPool, Config config,
147150
RoutingSettings routingSettings )
148151
{
149-
return new LoadBalancer( address, routingSettings, connectionPool, createClock(), config.logging() );
152+
return new LoadBalancer( address, routingSettings, connectionPool, createClock(), config.logging(),
153+
createLoadBalancingStrategy( config, connectionPool ) );
154+
}
155+
156+
private LoadBalancingStrategy createLoadBalancingStrategy( Config config, ConnectionPool connectionPool )
157+
{
158+
switch ( config.loadBalancingStrategy() )
159+
{
160+
case ROUND_ROBIN:
161+
return new RoundRobinLoadBalancingStrategy( config.logging() );
162+
case LEAST_CONNECTED:
163+
return new LeastConnectedLoadBalancingStrategy( connectionPool, config.logging() );
164+
default:
165+
throw new IllegalArgumentException( "Unknown load balancing strategy: " + config.loadBalancingStrategy() );
166+
}
150167
}
151168

152169
/**

driver/src/main/java/org/neo4j/driver/internal/cluster/RoundRobinAddressSet.java renamed to driver/src/main/java/org/neo4j/driver/internal/cluster/AddressSet.java

Lines changed: 7 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,39 +20,23 @@
2020

2121
import java.util.Arrays;
2222
import java.util.Set;
23-
import java.util.concurrent.atomic.AtomicInteger;
2423

2524
import org.neo4j.driver.internal.net.BoltServerAddress;
2625

27-
public class RoundRobinAddressSet
26+
public class AddressSet
2827
{
2928
private static final BoltServerAddress[] NONE = {};
30-
private final AtomicInteger offset = new AtomicInteger();
31-
private volatile BoltServerAddress[] addresses = NONE;
3229

33-
public int size()
34-
{
35-
return addresses.length;
36-
}
30+
private volatile BoltServerAddress[] addresses = NONE;
3731

38-
public BoltServerAddress next()
32+
public BoltServerAddress[] toArray()
3933
{
40-
BoltServerAddress[] addresses = this.addresses;
41-
if ( addresses.length == 0 )
42-
{
43-
return null;
44-
}
45-
return addresses[next( addresses.length )];
34+
return addresses;
4635
}
4736

48-
int next( int divisor )
37+
public int size()
4938
{
50-
int index = offset.getAndIncrement();
51-
for ( ; index == Integer.MAX_VALUE; index = offset.getAndIncrement() )
52-
{
53-
offset.compareAndSet( Integer.MIN_VALUE, index % divisor );
54-
}
55-
return index % divisor;
39+
return addresses.length;
5640
}
5741

5842
public synchronized void update( Set<BoltServerAddress> addresses, Set<BoltServerAddress> removed )
@@ -132,12 +116,6 @@ public synchronized void remove( BoltServerAddress address )
132116
@Override
133117
public String toString()
134118
{
135-
return "RoundRobinAddressSet=" + Arrays.toString( addresses );
136-
}
137-
138-
/** breaking encapsulation in order to perform white-box testing of boundary case */
139-
void setOffset( int target )
140-
{
141-
offset.set( target );
119+
return "AddressSet=" + Arrays.toString( addresses );
142120
}
143121
}

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterComposition.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.neo4j.driver.v1.Value;
2727
import org.neo4j.driver.v1.util.Function;
2828

29-
final class ClusterComposition
29+
public final class ClusterComposition
3030
{
3131
private static final long MAX_TTL = Long.MAX_VALUE / 1000L;
3232
private static final Function<Value,BoltServerAddress> OF_BoltServerAddress =
@@ -53,7 +53,7 @@ private ClusterComposition( long expirationTimestamp )
5353
}
5454

5555
/** For testing */
56-
ClusterComposition(
56+
public ClusterComposition(
5757
long expirationTimestamp,
5858
Set<BoltServerAddress> readers,
5959
Set<BoltServerAddress> writers,

driver/src/main/java/org/neo4j/driver/internal/cluster/ClusterRoutingTable.java

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ public class ClusterRoutingTable implements RoutingTable
3636

3737
private final Clock clock;
3838
private volatile long expirationTimeout;
39-
private final RoundRobinAddressSet readers;
40-
private final RoundRobinAddressSet writers;
41-
private final RoundRobinAddressSet routers;
39+
private final AddressSet readers;
40+
private final AddressSet writers;
41+
private final AddressSet routers;
4242

4343
public ClusterRoutingTable( Clock clock, BoltServerAddress... routingAddresses )
4444
{
@@ -51,9 +51,9 @@ private ClusterRoutingTable( Clock clock )
5151
this.clock = clock;
5252
this.expirationTimeout = clock.millis() - 1;
5353

54-
this.readers = new RoundRobinAddressSet();
55-
this.writers = new RoundRobinAddressSet();
56-
this.routers = new RoundRobinAddressSet();
54+
this.readers = new AddressSet();
55+
this.writers = new AddressSet();
56+
this.routers = new AddressSet();
5757
}
5858

5959
@Override
@@ -85,27 +85,21 @@ public synchronized void forget( BoltServerAddress address )
8585
}
8686

8787
@Override
88-
public RoundRobinAddressSet readers()
88+
public AddressSet readers()
8989
{
9090
return readers;
9191
}
9292

9393
@Override
94-
public RoundRobinAddressSet writers()
94+
public AddressSet writers()
9595
{
9696
return writers;
9797
}
9898

9999
@Override
100-
public BoltServerAddress nextRouter()
100+
public AddressSet routers()
101101
{
102-
return routers.next();
103-
}
104-
105-
@Override
106-
public int routerSize()
107-
{
108-
return routers.size();
102+
return routers;
109103
}
110104

111105
@Override

driver/src/main/java/org/neo4j/driver/internal/cluster/Rediscovery.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -136,15 +136,10 @@ private ClusterComposition lookupOnInitialRouterThenOnKnownRouters( RoutingTable
136136
private ClusterComposition lookupOnKnownRouters( RoutingTable routingTable, ConnectionPool connections,
137137
Set<BoltServerAddress> seenServers )
138138
{
139-
int size = routingTable.routerSize();
140-
for ( int i = 0; i < size; i++ )
141-
{
142-
BoltServerAddress address = routingTable.nextRouter();
143-
if ( address == null )
144-
{
145-
break;
146-
}
139+
BoltServerAddress[] addresses = routingTable.routers().toArray();
147140

141+
for ( BoltServerAddress address : addresses )
142+
{
148143
ClusterComposition composition = lookupOnRouter( address, routingTable, connections );
149144
if ( composition != null )
150145
{

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingPooledConnection.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,13 @@
3535

3636
import static java.lang.String.format;
3737

38-
class RoutingPooledConnection implements PooledConnection
38+
public class RoutingPooledConnection implements PooledConnection
3939
{
4040
private final PooledConnection delegate;
4141
private final RoutingErrorHandler errorHandler;
4242
private final AccessMode accessMode;
4343

44-
RoutingPooledConnection( PooledConnection delegate, RoutingErrorHandler errorHandler, AccessMode accessMode )
44+
public RoutingPooledConnection( PooledConnection delegate, RoutingErrorHandler errorHandler, AccessMode accessMode )
4545
{
4646
this.delegate = delegate;
4747
this.errorHandler = errorHandler;

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingTable.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,11 @@ public interface RoutingTable
3131

3232
void forget( BoltServerAddress address );
3333

34-
RoundRobinAddressSet readers();
34+
AddressSet readers();
3535

36-
RoundRobinAddressSet writers();
36+
AddressSet writers();
3737

38-
BoltServerAddress nextRouter();
39-
40-
int routerSize();
38+
AddressSet routers();
4139

4240
void removeWriter( BoltServerAddress toRemove );
4341
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.cluster.loadbalancing;
20+
21+
import org.neo4j.driver.internal.net.BoltServerAddress;
22+
import org.neo4j.driver.internal.spi.ConnectionPool;
23+
import org.neo4j.driver.v1.Logger;
24+
import org.neo4j.driver.v1.Logging;
25+
26+
/**
27+
* Load balancing strategy that finds server with least amount of active (checked out of the pool) connections from
28+
* given readers or writers. It finds a start index for iteration in a round-robin fashion. This is done to prevent
29+
* choosing same first address over and over when all addresses have same amount of active connections.
30+
*/
31+
public class LeastConnectedLoadBalancingStrategy implements LoadBalancingStrategy
32+
{
33+
private static final String LOGGER_NAME = LeastConnectedLoadBalancingStrategy.class.getSimpleName();
34+
35+
private final RoundRobinArrayIndex readersIndex = new RoundRobinArrayIndex();
36+
private final RoundRobinArrayIndex writersIndex = new RoundRobinArrayIndex();
37+
38+
private final ConnectionPool connectionPool;
39+
private final Logger log;
40+
41+
public LeastConnectedLoadBalancingStrategy( ConnectionPool connectionPool, Logging logging )
42+
{
43+
this.connectionPool = connectionPool;
44+
this.log = logging.getLog( LOGGER_NAME );
45+
}
46+
47+
@Override
48+
public BoltServerAddress selectReader( BoltServerAddress[] knownReaders )
49+
{
50+
return select( knownReaders, readersIndex, "reader" );
51+
}
52+
53+
@Override
54+
public BoltServerAddress selectWriter( BoltServerAddress[] knownWriters )
55+
{
56+
return select( knownWriters, writersIndex, "writer" );
57+
}
58+
59+
private BoltServerAddress select( BoltServerAddress[] addresses, RoundRobinArrayIndex addressesIndex,
60+
String addressType )
61+
{
62+
int size = addresses.length;
63+
if ( size == 0 )
64+
{
65+
log.trace( "Unable to select %s, no known addresses given", addressType );
66+
return null;
67+
}
68+
69+
// choose start index for iteration in round-rodin fashion
70+
int startIndex = addressesIndex.next( size );
71+
int index = startIndex;
72+
73+
BoltServerAddress leastConnectedAddress = null;
74+
int leastActiveConnections = Integer.MAX_VALUE;
75+
76+
// iterate over the array to find least connected address
77+
do
78+
{
79+
BoltServerAddress address = addresses[index];
80+
int activeConnections = connectionPool.activeConnections( address );
81+
82+
if ( activeConnections < leastActiveConnections )
83+
{
84+
leastConnectedAddress = address;
85+
leastActiveConnections = activeConnections;
86+
}
87+
88+
// loop over to the start of the array when end is reached
89+
if ( index == size - 1 )
90+
{
91+
index = 0;
92+
}
93+
else
94+
{
95+
index++;
96+
}
97+
}
98+
while ( index != startIndex );
99+
100+
log.trace( "Selected %s with address: '%s' and active connections: %s",
101+
addressType, leastConnectedAddress, leastActiveConnections );
102+
103+
return leastConnectedAddress;
104+
}
105+
}

0 commit comments

Comments
 (0)