Skip to content

Commit 16eb4fc

Browse files
authored
Merge pull request #429 from lutovich/1.5-buffer-handling
Fixed couple netty buffer leaks
2 parents aac5738 + 8ad6626 commit 16eb4fc

15 files changed

+79
-55
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/inbound/MessageDecoder.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,21 +41,21 @@ public void channelRead( ChannelHandlerContext ctx, Object msg ) throws Exceptio
4141
}
4242

4343
@Override
44-
protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out ) throws Exception
44+
protected void decode( ChannelHandlerContext ctx, ByteBuf in, List<Object> out )
4545
{
4646
if ( readMessageBoundary )
4747
{
4848
// now we have a complete message in the input buffer
4949

50-
// increment ref count of the buffer because we will pass it's duplicate through
51-
in.retain();
52-
ByteBuf res = in.duplicate();
50+
// increment ref count of the buffer and create it's duplicate that shares the content
51+
// duplicate will be the output of this decoded and input for the next one
52+
ByteBuf messageBuf = in.retainedDuplicate();
5353

5454
// signal that whole message was read by making input buffer seem like it was fully read/consumed
5555
in.readerIndex( in.readableBytes() );
5656

5757
// pass the full message to the next handler in the pipeline
58-
out.add( res );
58+
out.add( messageBuf );
5959

6060
readMessageBoundary = false;
6161
}

driver/src/main/java/org/neo4j/driver/internal/async/outbound/OutboundMessageHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,14 @@ protected void encode( ChannelHandlerContext ctx, Message msg, List<Object> out
6565
try
6666
{
6767
writer.write( msg );
68+
output.stop();
6869
}
6970
catch ( Throwable error )
70-
{
71-
throw new EncoderException( "Failed to write outbound message: " + msg, error );
72-
}
73-
finally
7471
{
7572
output.stop();
73+
// release buffer because it will not get added to the out list and no other handler is going to handle it
74+
messageBuf.release();
75+
throw new EncoderException( "Failed to write outbound message: " + msg, error );
7676
}
7777

7878
if ( log.isTraceEnabled() )

driver/src/test/java/org/neo4j/driver/internal/async/ChannelAttributesTest.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.neo4j.driver.internal.async;
2020

2121
import io.netty.channel.embedded.EmbeddedChannel;
22-
import org.junit.After;
2322
import org.junit.Test;
2423

2524
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
@@ -47,12 +46,6 @@ public class ChannelAttributesTest
4746
{
4847
private final EmbeddedChannel channel = new EmbeddedChannel();
4948

50-
@After
51-
public void tearDown() throws Exception
52-
{
53-
channel.close();
54-
}
55-
5649
@Test
5750
public void shouldSetAndGetAddress()
5851
{

driver/src/test/java/org/neo4j/driver/internal/async/ChannelConnectedListenerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class ChannelConnectedListenerTest
4545
@After
4646
public void tearDown()
4747
{
48-
channel.close();
48+
channel.finishAndReleaseAll();
4949
}
5050

5151
@Test

driver/src/test/java/org/neo4j/driver/internal/async/HandshakeCompletedListenerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@ public class HandshakeCompletedListenerTest
4949
private final EmbeddedChannel channel = new EmbeddedChannel();
5050

5151
@After
52-
public void tearDown() throws Exception
52+
public void tearDown()
5353
{
54-
channel.close();
54+
channel.finishAndReleaseAll();
5555
}
5656

5757
@Test

driver/src/test/java/org/neo4j/driver/internal/async/HandshakeResponseHandlerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public void setUp()
5959
@After
6060
public void tearDown()
6161
{
62-
channel.close();
62+
channel.finishAndReleaseAll();
6363
}
6464

6565
@Test

driver/src/test/java/org/neo4j/driver/internal/async/NettyChannelInitializerTest.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import io.netty.channel.embedded.EmbeddedChannel;
2222
import io.netty.handler.ssl.SslHandler;
23+
import org.junit.After;
2324
import org.junit.Test;
2425

2526
import org.neo4j.driver.internal.security.SecurityPlan;
@@ -31,20 +32,27 @@
3132
import static org.junit.Assert.assertNull;
3233
import static org.mockito.Mockito.mock;
3334
import static org.mockito.Mockito.when;
35+
import static org.neo4j.driver.internal.async.BoltServerAddress.LOCAL_DEFAULT;
3436
import static org.neo4j.driver.internal.async.ChannelAttributes.creationTimestamp;
3537
import static org.neo4j.driver.internal.async.ChannelAttributes.messageDispatcher;
3638
import static org.neo4j.driver.internal.async.ChannelAttributes.serverAddress;
37-
import static org.neo4j.driver.internal.async.BoltServerAddress.LOCAL_DEFAULT;
3839

3940
public class NettyChannelInitializerTest
4041
{
42+
private final EmbeddedChannel channel = new EmbeddedChannel();
43+
44+
@After
45+
public void tearDown()
46+
{
47+
channel.finishAndReleaseAll();
48+
}
49+
4150
@Test
4251
public void shouldAddSslHandlerWhenRequiresEncryption() throws Exception
4352
{
4453
SecurityPlan security = SecurityPlan.forAllCertificates();
4554
NettyChannelInitializer initializer = new NettyChannelInitializer( LOCAL_DEFAULT, security, new FakeClock() );
4655

47-
EmbeddedChannel channel = new EmbeddedChannel();
4856
initializer.initChannel( channel );
4957

5058
assertNotNull( channel.pipeline().get( SslHandler.class ) );
@@ -56,7 +64,6 @@ public void shouldNotAddSslHandlerWhenDoesNotRequireEncryption()
5664
SecurityPlan security = SecurityPlan.insecure();
5765
NettyChannelInitializer initializer = new NettyChannelInitializer( LOCAL_DEFAULT, security, new FakeClock() );
5866

59-
EmbeddedChannel channel = new EmbeddedChannel();
6067
initializer.initChannel( channel );
6168

6269
assertNull( channel.pipeline().get( SslHandler.class ) );
@@ -70,7 +77,6 @@ public void shouldUpdateChannelAttributes()
7077
SecurityPlan security = SecurityPlan.insecure();
7178
NettyChannelInitializer initializer = new NettyChannelInitializer( LOCAL_DEFAULT, security, clock );
7279

73-
EmbeddedChannel channel = new EmbeddedChannel();
7480
initializer.initChannel( channel );
7581

7682
assertEquals( LOCAL_DEFAULT, serverAddress( channel ) );

driver/src/test/java/org/neo4j/driver/internal/async/inbound/ChunkDecoderTest.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import io.netty.buffer.ByteBuf;
2222
import io.netty.channel.embedded.EmbeddedChannel;
23+
import org.junit.After;
2324
import org.junit.Test;
2425

2526
import static io.netty.buffer.Unpooled.buffer;
@@ -28,9 +29,18 @@
2829
import static org.junit.Assert.assertEquals;
2930
import static org.junit.Assert.assertFalse;
3031
import static org.junit.Assert.assertTrue;
32+
import static org.neo4j.driver.v1.util.TestUtil.assertByteBufEquals;
3133

3234
public class ChunkDecoderTest
3335
{
36+
private final EmbeddedChannel channel = new EmbeddedChannel( new ChunkDecoder() );
37+
38+
@After
39+
public void tearDown()
40+
{
41+
channel.finishAndReleaseAll();
42+
}
43+
3444
@Test
3545
public void shouldDecodeFullChunk()
3646
{
@@ -54,7 +64,7 @@ public void shouldDecodeFullChunk()
5464
// there should only be a single chunk available for reading
5565
assertEquals( 1, channel.inboundMessages().size() );
5666
// it should have no size header and expected body
57-
assertEquals( input.slice( 2, 7 ), channel.readInbound() );
67+
assertByteBufEquals( input.slice( 2, 7 ), channel.readInbound() );
5868
}
5969

6070
@Test
@@ -97,7 +107,7 @@ public void shouldDecodeSplitChunk()
97107
// there should only be a single chunk available for reading
98108
assertEquals( 1, channel.inboundMessages().size() );
99109
// it should have no size header and expected body
100-
assertEquals( wrappedBuffer( new byte[]{1, 11, 2, 22, 3, 33, 4, 44, 5} ), channel.readInbound() );
110+
assertByteBufEquals( wrappedBuffer( new byte[]{1, 11, 2, 22, 3, 33, 4, 44, 5} ), channel.readInbound() );
101111
}
102112

103113
@Test
@@ -113,6 +123,6 @@ public void shouldDecodeEmptyChunk()
113123
// there should only be a single chunk available for reading
114124
assertEquals( 1, channel.inboundMessages().size() );
115125
// it should have no size header and empty body
116-
assertEquals( wrappedBuffer( new byte[0] ), channel.readInbound() );
126+
assertByteBufEquals( wrappedBuffer( new byte[0] ), channel.readInbound() );
117127
}
118128
}

driver/src/test/java/org/neo4j/driver/internal/async/inbound/InboundMessageHandlerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public void tearDown()
7676
{
7777
if ( channel != null )
7878
{
79-
channel.close();
79+
channel.finishAndReleaseAll();
8080
}
8181
}
8282

driver/src/test/java/org/neo4j/driver/internal/async/inbound/MessageDecoderTest.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,41 +19,47 @@
1919
package org.neo4j.driver.internal.async.inbound;
2020

2121
import io.netty.channel.embedded.EmbeddedChannel;
22+
import org.junit.After;
2223
import org.junit.Test;
2324

2425
import static io.netty.buffer.Unpooled.wrappedBuffer;
2526
import static org.junit.Assert.assertEquals;
2627
import static org.junit.Assert.assertFalse;
2728
import static org.junit.Assert.assertTrue;
29+
import static org.neo4j.driver.v1.util.TestUtil.assertByteBufEquals;
2830

2931
public class MessageDecoderTest
3032
{
33+
private final EmbeddedChannel channel = new EmbeddedChannel( new MessageDecoder() );
34+
35+
@After
36+
public void tearDown()
37+
{
38+
channel.finishAndReleaseAll();
39+
}
40+
3141
@Test
3242
public void shouldDecodeMessageWithSingleChunk()
3343
{
34-
EmbeddedChannel channel = new EmbeddedChannel( new MessageDecoder() );
35-
3644
assertFalse( channel.writeInbound( wrappedBuffer( new byte[]{1, 2, 3, 4, 5} ) ) );
3745
assertTrue( channel.writeInbound( wrappedBuffer( new byte[0] ) ) );
3846
assertTrue( channel.finish() );
3947

4048
assertEquals( 1, channel.inboundMessages().size() );
41-
assertEquals( wrappedBuffer( new byte[]{1, 2, 3, 4, 5} ), channel.readInbound() );
49+
assertByteBufEquals( wrappedBuffer( new byte[]{1, 2, 3, 4, 5} ), channel.readInbound() );
4250
}
4351

4452
@Test
4553
public void shouldDecodeMessageWithMultipleChunks()
4654
{
47-
EmbeddedChannel channel = new EmbeddedChannel( new MessageDecoder() );
48-
4955
assertFalse( channel.writeInbound( wrappedBuffer( new byte[]{1, 2, 3} ) ) );
5056
assertFalse( channel.writeInbound( wrappedBuffer( new byte[]{4, 5} ) ) );
5157
assertFalse( channel.writeInbound( wrappedBuffer( new byte[]{6, 7, 8} ) ) );
5258
assertTrue( channel.writeInbound( wrappedBuffer( new byte[0] ) ) );
5359
assertTrue( channel.finish() );
5460

5561
assertEquals( 1, channel.inboundMessages().size() );
56-
assertEquals( wrappedBuffer( new byte[]{1, 2, 3, 4, 5, 6, 7, 8} ), channel.readInbound() );
62+
assertByteBufEquals( wrappedBuffer( new byte[]{1, 2, 3, 4, 5, 6, 7, 8} ), channel.readInbound() );
5763
}
5864

5965
@Test
@@ -73,8 +79,8 @@ public void shouldDecodeMultipleConsecutiveMessages()
7379
channel.writeInbound( wrappedBuffer( new byte[0] ) );
7480

7581
assertEquals( 3, channel.inboundMessages().size() );
76-
assertEquals( wrappedBuffer( new byte[]{1, 2, 3} ), channel.readInbound() );
77-
assertEquals( wrappedBuffer( new byte[]{4, 5, 6} ), channel.readInbound() );
78-
assertEquals( wrappedBuffer( new byte[]{7, 8, 9, 10} ), channel.readInbound() );
82+
assertByteBufEquals( wrappedBuffer( new byte[]{1, 2, 3} ), channel.readInbound() );
83+
assertByteBufEquals( wrappedBuffer( new byte[]{4, 5, 6} ), channel.readInbound() );
84+
assertByteBufEquals( wrappedBuffer( new byte[]{7, 8, 9, 10} ), channel.readInbound() );
7985
}
8086
}

0 commit comments

Comments
 (0)