Skip to content

Treat request errors as fatal #419

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,29 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;

import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;

import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.neo4j.driver.internal.async.ProtocolUtil.handshake;

public class ChannelConnectedListener implements ChannelFutureListener
{
private final BoltServerAddress address;
private final ChannelPipelineBuilder pipelineBuilder;
private final ChannelPromise handshakeCompletedPromise;
private final Logging logging;

public ChannelConnectedListener( BoltServerAddress address, ChannelPromise handshakeCompletedPromise,
Logging logging )
public ChannelConnectedListener( BoltServerAddress address, ChannelPipelineBuilder pipelineBuilder,
ChannelPromise handshakeCompletedPromise, Logging logging )
{
this.address = requireNonNull( address );
this.handshakeCompletedPromise = requireNonNull( handshakeCompletedPromise );
this.logging = requireNonNull( logging );
this.address = address;
this.pipelineBuilder = pipelineBuilder;
this.handshakeCompletedPromise = handshakeCompletedPromise;
this.logging = logging;
}

@Override
Expand All @@ -51,7 +53,8 @@ public void operationComplete( ChannelFuture future )

if ( future.isSuccess() )
{
channel.pipeline().addLast( new HandshakeResponseHandler( handshakeCompletedPromise, logging ) );
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast( new HandshakeResponseHandler( pipelineBuilder, handshakeCompletedPromise, logging ) );
ChannelFuture handshakeFuture = channel.writeAndFlush( handshake() );

handshakeFuture.addListener( channelFuture ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,25 @@ public class ChannelConnectorImpl implements ChannelConnector
private final String userAgent;
private final Map<String,Value> authToken;
private final SecurityPlan securityPlan;
private final ChannelPipelineBuilder pipelineBuilder;
private final int connectTimeoutMillis;
private final Logging logging;
private final Clock clock;

public ChannelConnectorImpl( ConnectionSettings connectionSettings, SecurityPlan securityPlan, Logging logging,
Clock clock )
{
this( connectionSettings, securityPlan, new ChannelPipelineBuilderImpl(), logging, clock );
}

public ChannelConnectorImpl( ConnectionSettings connectionSettings, SecurityPlan securityPlan,
ChannelPipelineBuilder pipelineBuilder, Logging logging, Clock clock )
{
this.userAgent = connectionSettings.userAgent();
this.authToken = tokenAsMap( connectionSettings.authToken() );
this.connectTimeoutMillis = connectionSettings.connectTimeoutMillis();
this.securityPlan = requireNonNull( securityPlan );
this.pipelineBuilder = pipelineBuilder;
this.logging = requireNonNull( logging );
this.clock = requireNonNull( clock );
}
Expand All @@ -70,8 +78,10 @@ public ChannelFuture connect( BoltServerAddress address, Bootstrap bootstrap )
ChannelPromise handshakeCompleted = channel.newPromise();
ChannelPromise connectionInitialized = channel.newPromise();

channelConnected.addListener( new ChannelConnectedListener( address, handshakeCompleted, logging ) );
handshakeCompleted.addListener( new HandshakeCompletedListener( userAgent, authToken, connectionInitialized ) );
channelConnected.addListener(
new ChannelConnectedListener( address, pipelineBuilder, handshakeCompleted, logging ) );
handshakeCompleted.addListener(
new HandshakeCompletedListener( userAgent, authToken, connectionInitialized ) );

return connectionInitialized;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private void fail( ChannelHandlerContext ctx, Throwable error )
{
Throwable cause = transformError( error );
messageDispatcher.handleFatalError( cause );
log.debug( "Closing channel: %s", ctx.channel() );
log.debug( "Closing channel because of an error: %s", ctx.channel() );
ctx.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,12 @@
*/
package org.neo4j.driver.internal.async;

import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;

import org.neo4j.driver.internal.async.inbound.ChunkDecoder;
import org.neo4j.driver.internal.async.inbound.InboundMessageHandler;
import org.neo4j.driver.internal.async.inbound.MessageDecoder;
import org.neo4j.driver.internal.async.outbound.OutboundMessageHandler;
import org.neo4j.driver.internal.messaging.MessageFormat;
import org.neo4j.driver.v1.Logging;

public final class ChannelPipelineBuilder
public interface ChannelPipelineBuilder
{
private ChannelPipelineBuilder()
{
}

public static void buildPipeline( Channel channel, MessageFormat messageFormat, Logging logging )
{
ChannelPipeline pipeline = channel.pipeline();

// inbound handlers
pipeline.addLast( new ChunkDecoder() );
pipeline.addLast( new MessageDecoder() );
pipeline.addLast( new InboundMessageHandler( messageFormat, logging ) );

// outbound handlers
pipeline.addLast( OutboundMessageHandler.NAME, new OutboundMessageHandler( messageFormat, logging ) );

// last one - error handler
pipeline.addLast( new ChannelErrorHandler( logging ) );
}
void build( MessageFormat messageFormat, ChannelPipeline pipeline, Logging logging );
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.async;

import io.netty.channel.ChannelPipeline;

import org.neo4j.driver.internal.async.inbound.ChunkDecoder;
import org.neo4j.driver.internal.async.inbound.InboundMessageHandler;
import org.neo4j.driver.internal.async.inbound.MessageDecoder;
import org.neo4j.driver.internal.async.outbound.OutboundMessageHandler;
import org.neo4j.driver.internal.messaging.MessageFormat;
import org.neo4j.driver.v1.Logging;

public class ChannelPipelineBuilderImpl implements ChannelPipelineBuilder
{
@Override
public void build( MessageFormat messageFormat, ChannelPipeline pipeline, Logging logging )
{
// inbound handlers
pipeline.addLast( new ChunkDecoder() );
pipeline.addLast( new MessageDecoder() );
pipeline.addLast( new InboundMessageHandler( messageFormat, logging ) );

// outbound handlers
pipeline.addLast( OutboundMessageHandler.NAME, new OutboundMessageHandler( messageFormat, logging ) );

// last one - error handler
pipeline.addLast( new ChannelErrorHandler( logging ) );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,15 @@

public class HandshakeResponseHandler extends ReplayingDecoder<Void>
{
private final ChannelPipelineBuilder pipelineBuilder;
private final ChannelPromise handshakeCompletedPromise;
private final Logging logging;
private final Logger log;

public HandshakeResponseHandler( ChannelPromise handshakeCompletedPromise, Logging logging )
public HandshakeResponseHandler( ChannelPipelineBuilder pipelineBuilder, ChannelPromise handshakeCompletedPromise,
Logging logging )
{
this.pipelineBuilder = pipelineBuilder;
this.handshakeCompletedPromise = handshakeCompletedPromise;
this.logging = logging;
this.log = logging.getLog( getClass().getSimpleName() );
Expand Down Expand Up @@ -80,10 +83,9 @@ protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out )
switch ( serverSuggestedVersion )
{
case PROTOCOL_VERSION_1:
MessageFormat format = new PackStreamMessageFormatV1();
ChannelPipelineBuilder.buildPipeline( ctx.channel(), format, logging );
MessageFormat messageFormat = new PackStreamMessageFormatV1();
pipelineBuilder.build( messageFormat, pipeline, logging );
handshakeCompletedPromise.setSuccess();

break;
case NO_PROTOCOL_VERSION:
fail( ctx, protocolNoSupportedByServerError() );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ public void handleFailureMessage( String code, String message )
log.debug( "Received FAILURE message with code '%s' and message '%s'", code, message );
currentError = ErrorUtil.newNeo4jError( code, message );

if ( ErrorUtil.isFatal( currentError ) )
{
// we should not continue using channel after a fatal error
// fire error event back to the pipeline and avoid sending ACK_FAILURE
channel.pipeline().fireExceptionCaught( currentError );
return;
}

// try to write ACK_FAILURE before notifying the next response handler
ackFailureIfNeeded();

Expand Down Expand Up @@ -180,6 +188,11 @@ public Throwable currentError()
return currentError;
}

public boolean fatalErrorOccurred()
{
return fatalErrorOccurred;
}

/**
* Makes this message dispatcher not send ACK_FAILURE in response to FAILURE until it's un-muted using
* {@link #unMuteAckFailure()}. Muting ACK_FAILURE is needed <b>only</b> when sending RESET message. RESET "jumps"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,16 @@ public void handlerRemoved( ChannelHandlerContext ctx )
@Override
protected void channelRead0( ChannelHandlerContext ctx, ByteBuf msg )
{
if ( messageDispatcher.fatalErrorOccurred() )
{
log.warn( "Message ignored because of the previous fatal error. Channel will be closed. Message:\n%s\n",
prettyHexDump( msg ) );
return;
}

if ( log.isTraceEnabled() )
{
log.trace( "Inbound message received: \n%s\n", prettyHexDump( msg ) );
log.trace( "Inbound message received:\n%s\n", prettyHexDump( msg ) );
}

input.start( msg );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,7 @@ protected void encode( ChannelHandlerContext ctx, Message msg, List<Object> out
}
catch ( Throwable error )
{
EncoderException exception = new EncoderException( "Failed to write outbound message: " + msg, error );
// tell ChannelErrorHandler which is the last handler in the pipeline about this error
ctx.fireExceptionCaught( exception );
// rethrow, encoder contract requires handler to either fail or populate out list
throw exception;
throw new EncoderException( "Failed to write outbound message: " + msg, error );
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,21 @@ public static Neo4jException newNeo4jError( String code, String message )
}
}

// todo: use this method and close channel after unrecoverable error
public static boolean isRecoverable( Throwable error )
public static boolean isFatal( Throwable error )
{
if ( error instanceof Neo4jException )
{
if ( isProtocolViolationError( ((Neo4jException) error) ) )
{
return false;
return true;
}

if ( isClientOrTransientError( ((Neo4jException) error) ) )
{
return true;
return false;
}
}
return false;
return true;
}

private static boolean isProtocolViolationError( Neo4jException error )
Expand All @@ -84,6 +83,10 @@ private static boolean isClientOrTransientError( Neo4jException error )
private static String extractClassification( String code )
{
String[] parts = code.split( "\\." );
if ( parts.length < 2 )
{
return "";
}
return parts[1];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,17 @@
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.neo4j.driver.internal.async.BoltServerAddress.LOCAL_DEFAULT;
import static org.neo4j.driver.internal.async.ProtocolUtil.handshake;
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;
import static org.neo4j.driver.internal.async.BoltServerAddress.LOCAL_DEFAULT;
import static org.neo4j.driver.v1.util.TestUtil.await;

public class ChannelConnectedListenerTest
{
private final EmbeddedChannel channel = new EmbeddedChannel();

@After
public void tearDown() throws Exception
public void tearDown()
{
channel.close();
}
Expand Down Expand Up @@ -90,6 +90,7 @@ public void shouldWriteHandshakeWhenChannelConnected()

private static ChannelConnectedListener newListener( ChannelPromise handshakeCompletedPromise )
{
return new ChannelConnectedListener( LOCAL_DEFAULT, handshakeCompletedPromise, DEV_NULL_LOGGING );
return new ChannelConnectedListener( LOCAL_DEFAULT, new ChannelPipelineBuilderImpl(),
handshakeCompletedPromise, DEV_NULL_LOGGING );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@
import static org.junit.Assert.assertThat;
import static org.neo4j.driver.internal.logging.DevNullLogging.DEV_NULL_LOGGING;

public class ChannelPipelineBuilderTest
public class ChannelPipelineBuilderImplTest
{
@Test
public void shouldBuildPipeline()
{
EmbeddedChannel channel = new EmbeddedChannel();
ChannelAttributes.setMessageDispatcher( channel, new InboundMessageDispatcher( channel, DEV_NULL_LOGGING ) );

ChannelPipelineBuilder.buildPipeline( channel, new PackStreamMessageFormatV1(), DEV_NULL_LOGGING );
new ChannelPipelineBuilderImpl().build( new PackStreamMessageFormatV1(), channel.pipeline(), DEV_NULL_LOGGING );

Iterator<Map.Entry<String,ChannelHandler>> iterator = channel.pipeline().iterator();
assertThat( iterator.next().getValue(), instanceOf( ChunkDecoder.class ) );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ private void testFailure( int serverSuggestedVersion, String expectedMessagePref

private static HandshakeResponseHandler newHandler( ChannelPromise handshakeCompletedPromise )
{
return new HandshakeResponseHandler( handshakeCompletedPromise, DEV_NULL_LOGGING );
return new HandshakeResponseHandler( new ChannelPipelineBuilderImpl(), handshakeCompletedPromise,
DEV_NULL_LOGGING );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.neo4j.driver.internal.InternalNode;
import org.neo4j.driver.internal.InternalPath;
import org.neo4j.driver.internal.InternalRelationship;
import org.neo4j.driver.internal.async.ChannelPipelineBuilder;
import org.neo4j.driver.internal.async.ChannelPipelineBuilderImpl;
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
import org.neo4j.driver.internal.async.outbound.ChunkAwareByteBufOutput;
import org.neo4j.driver.internal.packstream.PackStream;
Expand Down Expand Up @@ -151,7 +151,7 @@ private EmbeddedChannel newEmbeddedChannel()
{
EmbeddedChannel channel = new EmbeddedChannel();
setMessageDispatcher( channel, new MemorizingInboundMessageDispatcher( channel, DEV_NULL_LOGGING ) );
ChannelPipelineBuilder.buildPipeline( channel, format, DEV_NULL_LOGGING );
new ChannelPipelineBuilderImpl().build( format, channel.pipeline(), DEV_NULL_LOGGING );
return channel;
}

Expand Down
Loading