Skip to content

Relax connection termination policy in routing driver #424

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Nov 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class ClusterRoutingTable implements RoutingTable
public ClusterRoutingTable( Clock clock, BoltServerAddress... routingAddresses )
{
this( clock );
routers.update( new LinkedHashSet<>( asList( routingAddresses ) ), new HashSet<BoltServerAddress>() );
routers.update( new LinkedHashSet<>( asList( routingAddresses ) ));
}

private ClusterRoutingTable( Clock clock )
Expand All @@ -65,15 +65,31 @@ public boolean isStaleFor( AccessMode mode )
mode == AccessMode.WRITE && writers.size() == 0;
}

private Set<BoltServerAddress> servers()
{
Set<BoltServerAddress> servers = new HashSet<>();
servers.addAll( readers.servers() );
servers.addAll( writers.servers() );
servers.addAll( routers.servers() );
return servers;
}

@Override
public synchronized Set<BoltServerAddress> update( ClusterComposition cluster )
public synchronized RoutingTableChange update( ClusterComposition cluster )
{
expirationTimeout = cluster.expirationTimestamp();
Set<BoltServerAddress> removed = new HashSet<>();
readers.update( cluster.readers(), removed );
writers.update( cluster.writers(), removed );
routers.update( cluster.routers(), removed );
return removed;
Set<BoltServerAddress> previousServers = servers();

readers.update( cluster.readers() );
writers.update( cluster.writers() );
routers.update( cluster.routers() );
Set<BoltServerAddress> currentServers = servers();

Set<BoltServerAddress> added = new HashSet<>( currentServers );
Set<BoltServerAddress> removed = new HashSet<>( previousServers );
added.removeAll( previousServers );
removed.removeAll( currentServers );
return new RoutingTableChange( added, removed );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.neo4j.driver.internal.cluster;

import java.util.Set;

import org.neo4j.driver.internal.RoutingErrorHandler;
import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.spi.ConnectionPool;
Expand All @@ -35,6 +33,7 @@
public class LoadBalancer implements ConnectionProvider, RoutingErrorHandler, AutoCloseable
{
private static final String LOAD_BALANCER_LOG_NAME = "LoadBalancer";
private static final boolean PURGE_ON_ERROR = Boolean.getBoolean( "purgeOnError" );

private final ConnectionPool connections;
private final RoutingTable routingTable;
Expand Down Expand Up @@ -113,8 +112,14 @@ private synchronized void forget( BoltServerAddress address )
{
// First remove from the load balancer, to prevent concurrent threads from making connections to them.
routingTable.forget( address );
// drop all current connections to the address
connections.purge( address );
if ( PURGE_ON_ERROR )
{
connections.purge( address );
}
else
{
connections.deactivate( address );
}
}

synchronized void ensureRouting( AccessMode mode )
Expand All @@ -131,16 +136,35 @@ synchronized void refreshRoutingTable()

// get a new routing table
ClusterComposition cluster = rediscovery.lookupClusterComposition( routingTable, connections );
Set<BoltServerAddress> removed = routingTable.update( cluster );
// purge connections to removed addresses
for ( BoltServerAddress address : removed )
{
connections.purge( address );
}
RoutingTableChange routingTableChange = routingTable.update( cluster );
updateConnectionPool( routingTableChange );

log.info( "Refreshed routing information. %s", routingTable );
}

private void updateConnectionPool( RoutingTableChange routingTableChange )
{
if ( PURGE_ON_ERROR )
{
for ( BoltServerAddress removedAddress : routingTableChange.removed() )
{
connections.purge( removedAddress );
}
}
else
{
for ( BoltServerAddress addedAddress : routingTableChange.added() )
{
connections.activate( addedAddress );
}
for ( BoltServerAddress removedAddress : routingTableChange.removed() )
{
connections.deactivate( removedAddress );
}
connections.compact();
}
}

private RoundRobinAddressSet addressSetFor( AccessMode mode )
{
switch ( mode )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.neo4j.driver.internal.cluster;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -45,6 +46,11 @@ public BoltServerAddress next()
return addresses[next( addresses.length )];
}

public Set<BoltServerAddress> servers()
{
return new HashSet<>( Arrays.asList( addresses ) );
}

int next( int divisor )
{
int index = offset.getAndIncrement();
Expand All @@ -55,54 +61,9 @@ int next( int divisor )
return index % divisor;
}

public synchronized void update( Set<BoltServerAddress> addresses, Set<BoltServerAddress> removed )
public synchronized void update( Set<BoltServerAddress> addresses )
{
BoltServerAddress[] prev = this.addresses;
if ( addresses.isEmpty() )
{
this.addresses = NONE;
return;
}
if ( prev.length == 0 )
{
this.addresses = addresses.toArray( NONE );
return;
}
BoltServerAddress[] copy = null;
if ( addresses.size() != prev.length )
{
copy = new BoltServerAddress[addresses.size()];
}
int j = 0;
for ( int i = 0; i < prev.length; i++ )
{
if ( addresses.remove( prev[i] ) )
{
if ( copy != null )
{
copy[j++] = prev[i];
}
}
else
{
removed.add( prev[i] );
if ( copy == null )
{
copy = new BoltServerAddress[prev.length];
System.arraycopy( prev, 0, copy, 0, i );
j = i;
}
}
}
if ( copy == null )
{
return;
}
for ( BoltServerAddress address : addresses )
{
copy[j++] = address;
}
this.addresses = copy;
this.addresses = addresses.toArray( NONE );
}

public synchronized void remove( BoltServerAddress address )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@
*/
package org.neo4j.driver.internal.cluster;

import java.util.Set;

import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.v1.AccessMode;

public interface RoutingTable
{
boolean isStaleFor( AccessMode mode );

Set<BoltServerAddress> update( ClusterComposition cluster );
RoutingTableChange update( ClusterComposition cluster );

void forget( BoltServerAddress address );

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.cluster;

import java.util.Collections;
import java.util.Set;

import org.neo4j.driver.internal.net.BoltServerAddress;

import static java.util.Collections.unmodifiableSet;

public class RoutingTableChange
{
public static final RoutingTableChange EMPTY = new RoutingTableChange(
Collections.<BoltServerAddress>emptySet(), Collections.<BoltServerAddress>emptySet() );

private final Set<BoltServerAddress> added;
private final Set<BoltServerAddress> removed;

public RoutingTableChange( Set<BoltServerAddress> added, Set<BoltServerAddress> removed )
{
this.added = added;
this.removed = removed;
}

public Set<BoltServerAddress> added()
{
return unmodifiableSet( added );
}

public Set<BoltServerAddress> removed()
{
return unmodifiableSet( removed );
}

@Override
public String toString()
{
return "RoutingTableChange{" +
"added=" + added +
", removed=" + removed +
'}';
}
}
Loading