Skip to content

Throw ServiceUnavailableException when socket write fails #312

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class RoutingTransaction implements Transaction
private final BoltServerAddress address;
private final RoutingErrorHandler onError;

RoutingTransaction( Transaction delegate, AccessMode mode, BoltServerAddress address,
public RoutingTransaction( Transaction delegate, AccessMode mode, BoltServerAddress address,
RoutingErrorHandler onError )
{
this.delegate = delegate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ private LoadBalancer(
RoutingTable routingTable,
ClusterCompositionProvider provider ) throws ServiceUnavailableException
{
this( log, connections, routingTable, new Rediscovery( settings, clock, log, provider ) );
this( routingTable, connections, new Rediscovery( settings, clock, log, provider ), log );
}

LoadBalancer( Logger log, ConnectionPool connections, RoutingTable routingTable, Rediscovery rediscovery )
LoadBalancer( RoutingTable routingTable, ConnectionPool connections, Rediscovery rediscovery, Logger log )
throws ServiceUnavailableException
{
this.log = log;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ interface Writer
Writer write( Message msg ) throws IOException;

Writer flush() throws IOException;

Writer reset( WritableByteChannel channel );
}

interface Reader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,13 +345,6 @@ public Writer write( Message msg ) throws IOException
return this;
}

@Override
public Writer reset( WritableByteChannel channel )
{
packer.reset( channel );
return this;
}

private void packNode( Node node ) throws IOException
{
packer.packStructHeader( NODE_FIELDS, NODE );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.neo4j.driver.v1.Value;
import org.neo4j.driver.v1.exceptions.ClientException;
import org.neo4j.driver.v1.exceptions.Neo4jException;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
import org.neo4j.driver.v1.summary.ServerInfo;

import static java.lang.String.format;
Expand Down Expand Up @@ -165,8 +166,7 @@ public synchronized void flush()
}
catch ( IOException e )
{
String message = e.getMessage();
throw new ClientException( "Unable to send messages to server: " + message, e );
throw new ServiceUnavailableException( "Unable to send messages to server: " + e.getMessage(), e );
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.neo4j.driver.internal.packstream;

import java.io.IOException;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -158,16 +157,6 @@ public Packer( PackOutput out )
this.out = out;
}

public void reset( PackOutput out )
{
this.out = out;
}

public void reset( WritableByteChannel channel )
{
((BufferedChannelOutput) out).reset( channel );
}

public void flush() throws IOException
{
out.flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,26 @@
import org.junit.Test;
import org.mockito.InOrder;

import java.util.HashSet;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import org.neo4j.driver.internal.RoutingTransaction;
import org.neo4j.driver.internal.net.BoltServerAddress;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.spi.ConnectionPool;
import org.neo4j.driver.v1.AccessMode;
import org.neo4j.driver.v1.Transaction;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
import org.neo4j.driver.v1.exceptions.SessionExpiredException;

import static java.util.Arrays.asList;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
Expand All @@ -49,27 +58,28 @@ public void ensureRoutingShouldUpdateRoutingTableAndPurgeConnectionPoolWhenStale
RoutingTable routingTable = mock( RoutingTable.class );
Rediscovery rediscovery = mock( Rediscovery.class );
when( routingTable.isStale() ).thenReturn( true );
HashSet<BoltServerAddress> set = new HashSet<>( asList( new BoltServerAddress( "abc", 12 ) ) );
Set<BoltServerAddress> set = Collections.singleton( new BoltServerAddress( "abc", 12 ) );
when( routingTable.update( any( ClusterComposition.class ) ) ).thenReturn( set );

// when
LoadBalancer balancer = new LoadBalancer( DEV_NULL_LOGGER, conns, routingTable, rediscovery );
LoadBalancer balancer = new LoadBalancer( routingTable, conns, rediscovery, DEV_NULL_LOGGER );

// then
assertNotNull( balancer );
InOrder inOrder = inOrder( rediscovery, routingTable, conns );
inOrder.verify( rediscovery ).lookupRoutingTable( conns, routingTable );
inOrder.verify( routingTable ).update( any( ClusterComposition.class ) );
inOrder.verify( conns ).purge( new BoltServerAddress( "abc", 12 ) );
}


@Test
public void shouldEnsureRoutingOnInitialization() throws Exception
{
// given & when
final AtomicInteger ensureRoutingCounter = new AtomicInteger( 0 );
LoadBalancer balancer = new LoadBalancer( DEV_NULL_LOGGER, mock( ConnectionPool.class ),
mock( RoutingTable.class ), mock( Rediscovery.class ) ) {
LoadBalancer balancer = new LoadBalancer( mock( RoutingTable.class ), mock( ConnectionPool.class ),
mock( Rediscovery.class ), DEV_NULL_LOGGER )
{
@Override
public void ensureRouting()
{
Expand All @@ -78,6 +88,7 @@ public void ensureRouting()
};

// then
assertNotNull( balancer );
assertThat( ensureRoutingCounter.get(), equalTo( 1 ) );
}

Expand Down Expand Up @@ -129,9 +140,36 @@ private LoadBalancer setupLoadBalancer( Connection writerConn, Connection readCo
when( routingTable.readers() ).thenReturn( readerAddrs );
when( routingTable.writers() ).thenReturn( writerAddrs );

LoadBalancer balancer = new LoadBalancer( DEV_NULL_LOGGER, connPool,
routingTable, mock( Rediscovery.class ) ) ;
return new LoadBalancer( routingTable, connPool, mock( Rediscovery.class ), DEV_NULL_LOGGER );
}

return balancer;
@Test
public void shouldForgetAddressAndItsConnectionsOnServiceUnavailable()
{
Transaction tx = mock( Transaction.class );
RoutingTable routingTable = mock( RoutingTable.class );
ConnectionPool connectionPool = mock( ConnectionPool.class );
Rediscovery rediscovery = mock( Rediscovery.class );
LoadBalancer loadBalancer = new LoadBalancer( routingTable, connectionPool, rediscovery, DEV_NULL_LOGGER );
BoltServerAddress address = new BoltServerAddress( "host", 42 );

RoutingTransaction routingTx = new RoutingTransaction( tx, AccessMode.WRITE, address, loadBalancer );

ServiceUnavailableException txCloseError = new ServiceUnavailableException( "Oh!" );
doThrow( txCloseError ).when( tx ).close();

try
{
routingTx.close();
fail( "Exception expected" );
}
catch ( Exception e )
{
assertThat( e, instanceOf( SessionExpiredException.class ) );
assertThat( e.getCause(), instanceOf( ServiceUnavailableException.class ) );
}

verify( routingTable ).forget( address );
verify( connectionPool ).purge( address );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,22 @@
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Queue;

import org.neo4j.driver.internal.messaging.Message;
import org.neo4j.driver.internal.messaging.SuccessMessage;
import org.neo4j.driver.internal.summary.InternalServerInfo;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Values;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
import org.neo4j.driver.v1.summary.ServerInfo;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
Expand All @@ -45,16 +48,20 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.neo4j.driver.internal.logging.DevNullLogger.DEV_NULL_LOGGER;
import static org.neo4j.driver.internal.net.BoltServerAddress.LOCAL_DEFAULT;
import static org.neo4j.driver.v1.Values.parameters;

public class SocketConnectionTest
{
private static final InternalServerInfo SERVER_INFO = new InternalServerInfo( LOCAL_DEFAULT, "test" );

@Test
public void shouldReceiveServerInfoAfterInit() throws Throwable
{
// Given
SocketClient socket = mock( SocketClient.class );
SocketConnection conn = new SocketConnection( socket, mock( InternalServerInfo.class ), mock( Logger.class ) );
SocketConnection conn = new SocketConnection( socket, SERVER_INFO, DEV_NULL_LOGGER );

when( socket.address() ).thenReturn( BoltServerAddress.from( URI.create( "http://neo4j.com:9000" ) ) );

Expand Down Expand Up @@ -98,7 +105,7 @@ public void shouldCloseConnectionIfFailedToCreate() throws Throwable
// Then
try
{
SocketConnection conn = new SocketConnection( socket, mock( InternalServerInfo.class ), mock( Logger.class ) );
new SocketConnection( socket, SERVER_INFO, DEV_NULL_LOGGER );
fail( "should have failed with the provided exception" );
}
catch( Throwable e )
Expand All @@ -108,4 +115,26 @@ public void shouldCloseConnectionIfFailedToCreate() throws Throwable
}
verify( socket, times( 1 ) ).stop();
}

@Test
@SuppressWarnings( "unchecked" )
public void flushThrowsWhenSocketIsBroken() throws Exception
{
SocketClient socket = mock( SocketClient.class );
IOException sendError = new IOException( "Unable to send" );
doThrow( sendError ).when( socket ).send( any( Queue.class ) );

SocketConnection connection = new SocketConnection( socket, SERVER_INFO, DEV_NULL_LOGGER );

try
{
connection.flush();
fail( "Exception expected" );
}
catch ( Exception e )
{
assertThat( e, instanceOf( ServiceUnavailableException.class ) );
assertSame( sendError, e.getCause() );
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,8 @@ public BufferedChannelInput(ReadableByteChannel ch )
public BufferedChannelInput( int bufferCapacity, ReadableByteChannel ch )
{
this.buffer = ByteBuffer.allocate( bufferCapacity ).order( ByteOrder.BIG_ENDIAN );
reset( ch );
}

public BufferedChannelInput reset( ReadableByteChannel ch )
{
this.channel = ch;
this.buffer.position( 0 );
this.buffer.limit( 0 );
return this;
this.channel = ch;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,7 @@
public class BufferedChannelOutput implements PackOutput
{
private final ByteBuffer buffer;
private WritableByteChannel channel;

public BufferedChannelOutput( int bufferSize )
{
this.buffer = ByteBuffer.allocate( bufferSize ).order( ByteOrder.BIG_ENDIAN );
}
private final WritableByteChannel channel;

public BufferedChannelOutput( WritableByteChannel channel )
{
Expand All @@ -40,12 +35,7 @@ public BufferedChannelOutput( WritableByteChannel channel )

public BufferedChannelOutput( WritableByteChannel channel, int bufferSize )
{
this( bufferSize );
reset( channel );
}

public void reset( WritableByteChannel channel )
{
this.buffer = ByteBuffer.allocate( bufferSize ).order( ByteOrder.BIG_ENDIAN );
this.channel = channel;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ public void shouldHandleGracefulLeaderSwitch() throws Exception
parameters( "name", "Webber", "title", "Mr" ) );
tx1.success();

closeAndExpectException( tx1, ClientException.class );
closeAndExpectException( session1, ClientException.class );
closeAndExpectException( tx1, SessionExpiredException.class );
session1.close();

String bookmark = inExpirableSession( driver, createSession(), new Function<Session,String>()
{
Expand Down