Skip to content

netty: netty tests resource leakage issue #11593

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 45 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
07c8b0b
Defect-3353 - Added the Suggestions by Eric with recent issues commen…
vinodhabib Aug 5, 2024
58ecc92
Defect-3353 - Added the Suggestions by Eric with recent issues commen…
vinodhabib Aug 5, 2024
74674ea
Defect-3353 - Added the Suggestions by Eric with recent issues commen…
vinodhabib Aug 5, 2024
2f60656
Defect-3353 - Added the Suggestions by Eric with recent issues commen…
vinodhabib Aug 6, 2024
7b4a65a
Defect-3353 - Fixed 3 UTs with Leakage issue.
vinodhabib Aug 6, 2024
ec566f7
Defect-3353 - Fixed dataSizeSincePingAccumulates UT case.
vinodhabib Aug 9, 2024
5989fd7
Defect-3353 - Fixed Failing 3 UTs
vinodhabib Aug 9, 2024
148833b
Defect-3353 - Fixed/Modified the maxRstCount related UTs.
vinodhabib Aug 9, 2024
9f9b873
Defect-3353 - Fixed Failing UT in NettyClientHandlerTest
vinodhabib Aug 9, 2024
0018b16
Defect-3353 - Fixed Randomly Failing UTs in Base Test
vinodhabib Aug 12, 2024
e4d85c5
Defect-3353 - Updated to framer.close() method will flush, closes the…
vinodhabib Aug 12, 2024
8530a63
Defect-3353 - Fixed UT - inboundDataShouldForwardToStreamListener
vinodhabib Aug 16, 2024
0ec3f2a
Defect-3353 - Fixed UT - inboundDataShouldForwardToStreamListener
vinodhabib Aug 16, 2024
41ec48e
Defect-3353 - Fixed UT - windowUpdateMatchesTarget
vinodhabib Aug 16, 2024
fd64d5d
Merge branch 'master' into defect-3353-nettyTestsResourceLeakFix
vinodhabib Aug 16, 2024
b5cc9c8
Defect-3353 - Updated rapidReset method
vinodhabib Aug 16, 2024
0ecd460
Defect-3353 - Removed frame.clear() as per suggestions
vinodhabib Aug 21, 2024
ff5bd67
Defect-3353 - Fixed the grpcFrame method leakage issue
vinodhabib Aug 22, 2024
01dc433
Defect-3353 - Added changes to use streamTransportState and streamLis…
vinodhabib Sep 30, 2024
8376fad
Merge branch 'master' into defect-3353-nettyTestsResourceLeakFix
vinodhabib Oct 3, 2024
8bfe353
Defect-3353 - Code cleanup
vinodhabib Oct 3, 2024
f316d5e
Defect-3353 - Fixed the PR check issue
vinodhabib Oct 3, 2024
1d9b406
Defect-3353 - Fixed the PR check issue
vinodhabib Oct 3, 2024
06f1692
Defect-3353 - Fixed the PR check issue
vinodhabib Oct 3, 2024
44197ee
Defect-3353 - Fixed checkstyle issues
vinodhabib Oct 3, 2024
08590fd
grpc-netty: Removing changes to use streamTransportState and streamLi…
vinodhabib Oct 14, 2024
454ff6a
grpc-netty: Fixed checkstyle issue
vinodhabib Oct 14, 2024
abdbb21
Merge branch 'master' into defect-3353-nettyTestsResourceLeakFix
vinodhabib Oct 14, 2024
e0ad32b
netty: Fixed recent review points
vinodhabib Oct 30, 2024
f0dcb2b
netty: added releaseOutbound() entry in setup() method for NettyClien…
vinodhabib Oct 30, 2024
31602a6
netty: reverted back the change to fix failing test in netty server
vinodhabib Oct 30, 2024
0ae38ee
netty: Fixed leakage issues of bdpPing UTs in Netty Client test
vinodhabib Nov 4, 2024
d0e893e
netty: Removed the releaseOutbound entry as its already handled in ra…
vinodhabib Nov 4, 2024
bbeb872
netty: Reverted back releaseOutboundMessage entry which is not needed
vinodhabib Nov 4, 2024
d0787e2
netty: Reverted back releaseOutboundMessage entry which is not needed…
vinodhabib Nov 4, 2024
efbc818
netty: Reverted back releaseOutboundMessage entry which is not needed…
vinodhabib Nov 4, 2024
54cc020
netty: Fixed Review point and leakage issue in dataSizeSincePingAccum…
vinodhabib Nov 5, 2024
6c67cea
netty: Fixed the recent review points
vinodhabib Nov 12, 2024
b99debf
netty: Reverted back the code which was part of trial and error to fi…
vinodhabib Nov 12, 2024
65399e8
netty: Fixed review points
vinodhabib Nov 13, 2024
0265c68
netty: Fixed review points
vinodhabib Nov 18, 2024
ea11932
netty: Reverting back the UTF_8 import change
vinodhabib Nov 18, 2024
8a61e6f
netty: Reverting back the UTF_8 import change
vinodhabib Nov 18, 2024
5814bb7
netty: Replaced \015 with \r to fix the checkstyle issue
vinodhabib Nov 26, 2024
da6595d
netty: Fixed review point by using pre-existing UTF_8 static import
vinodhabib Nov 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
// Simulate receipt of initial remote settings.
ByteBuf serializedSettings = serializeSettings(new Http2Settings());
channelRead(serializedSettings);
channel().releaseOutbound();
}

@Test
Expand Down Expand Up @@ -310,11 +311,12 @@ public void sendFrameShouldSucceed() throws Exception {
createStream();

// Send a frame and verify that it was written.
ByteBuf content = content();
ChannelFuture future
= enqueue(new SendGrpcFrameCommand(streamTransportState, content(), true));
= enqueue(new SendGrpcFrameCommand(streamTransportState, content, true));

assertTrue(future.isSuccess());
verifyWrite().writeData(eq(ctx()), eq(STREAM_ID), eq(content()), eq(0), eq(true),
verifyWrite().writeData(eq(ctx()), eq(STREAM_ID), same(content), eq(0), eq(true),
any(ChannelPromise.class));
verify(mockKeepAliveManager, times(1)).onTransportActive(); // onStreamActive
verifyNoMoreInteractions(mockKeepAliveManager);
Expand Down
41 changes: 24 additions & 17 deletions netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import io.grpc.internal.WritableBuffer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
Expand Down Expand Up @@ -68,6 +67,7 @@
import java.nio.ByteBuffer;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -84,7 +84,6 @@
public abstract class NettyHandlerTestBase<T extends Http2ConnectionHandler> {

protected static final int STREAM_ID = 3;
private ByteBuf content;

private EmbeddedChannel channel;

Expand All @@ -106,18 +105,24 @@ protected void manualSetUp() throws Exception {}
protected final TransportTracer transportTracer = new TransportTracer();
protected int flowControlWindow = DEFAULT_WINDOW_SIZE;
protected boolean autoFlowControl = false;

private final FakeClock fakeClock = new FakeClock();

FakeClock fakeClock() {
return fakeClock;
}

@After
public void tearDown() throws Exception {
if (channel() != null) {
channel().releaseInbound();
channel().releaseOutbound();
}
}

/**
* Must be called by subclasses to initialize the handler and channel.
*/
protected final void initChannel(Http2HeadersDecoder headersDecoder) throws Exception {
content = Unpooled.copiedBuffer("hello world", UTF_8);
frameWriter = mock(Http2FrameWriter.class, delegatesTo(new DefaultHttp2FrameWriter()));
frameReader = new DefaultHttp2FrameReader(headersDecoder);

Expand Down Expand Up @@ -233,11 +238,11 @@ protected final Http2FrameReader frameReader() {
}

protected final ByteBuf content() {
return content;
return Unpooled.copiedBuffer(contentAsArray());
}

protected final byte[] contentAsArray() {
return ByteBufUtil.getBytes(content());
return "\000\000\000\000\rhello world".getBytes(UTF_8);
}

protected final Http2FrameWriter verifyWrite() {
Expand All @@ -252,8 +257,8 @@ protected final void channelRead(Object obj) throws Exception {
channel.writeInbound(obj);
}

protected ByteBuf grpcDataFrame(int streamId, boolean endStream, byte[] content) {
final ByteBuf compressionFrame = Unpooled.buffer(content.length);
protected ByteBuf grpcFrame(byte[] message) {
final ByteBuf compressionFrame = Unpooled.buffer(message.length);
MessageFramer framer = new MessageFramer(
new MessageFramer.Sink() {
@Override
Expand All @@ -262,23 +267,22 @@ public void deliverFrame(
if (frame != null) {
ByteBuf bytebuf = ((NettyWritableBuffer) frame).bytebuf();
compressionFrame.writeBytes(bytebuf);
bytebuf.release();
}
}
},
new NettyWritableBufferAllocator(ByteBufAllocator.DEFAULT),
StatsTraceContext.NOOP);
framer.writePayload(new ByteArrayInputStream(content));
framer.flush();
ChannelHandlerContext ctx = newMockContext();
new DefaultHttp2FrameWriter().writeData(ctx, streamId, compressionFrame, 0, endStream,
newPromise());
return captureWrite(ctx);
framer.writePayload(new ByteArrayInputStream(message));
framer.close();
return compressionFrame;
}

protected final ByteBuf dataFrame(int streamId, boolean endStream, ByteBuf content) {
// Need to retain the content since the frameWriter releases it.
content.retain();
protected final ByteBuf grpcDataFrame(int streamId, boolean endStream, byte[] content) {
return dataFrame(streamId, endStream, grpcFrame(content));
}

protected final ByteBuf dataFrame(int streamId, boolean endStream, ByteBuf content) {
ChannelHandlerContext ctx = newMockContext();
new DefaultHttp2FrameWriter().writeData(ctx, streamId, content, 0, endStream, newPromise());
return captureWrite(ctx);
Expand Down Expand Up @@ -410,6 +414,7 @@ public void dataSizeSincePingAccumulates() throws Exception {
channelRead(dataFrame(3, false, buff.copy()));

assertEquals(length * 3, handler.flowControlPing().getDataSincePing());
buff.release();
}

@Test
Expand Down Expand Up @@ -608,12 +613,14 @@ public void bdpPingWindowResizing() throws Exception {

private void readPingAck(long pingData) throws Exception {
channelRead(pingFrame(true, pingData));
channel().releaseOutbound();
}

private void readXCopies(int copies, byte[] data) throws Exception {
for (int i = 0; i < copies; i++) {
channelRead(grpcDataFrame(STREAM_ID, false, data)); // buffer it
stream().request(1); // consume it
channel().releaseOutbound();
}
}

Expand Down
28 changes: 11 additions & 17 deletions netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
Expand Down Expand Up @@ -74,7 +75,6 @@
import io.grpc.internal.testing.TestServerStreamTracer;
import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2ServerHeadersDecoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
Expand Down Expand Up @@ -120,23 +120,16 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
public final TestRule globalTimeout = new DisableOnDebug(Timeout.seconds(10));
@Rule
public final MockitoRule mocks = MockitoJUnit.rule();

private static final AsciiString HTTP_FAKE_METHOD = AsciiString.of("FAKE");


@Mock
private ServerStreamListener streamListener;

@Mock
private ServerStreamTracer.Factory streamTracerFactory;

private final ServerTransportListener transportListener =
mock(ServerTransportListener.class, delegatesTo(new ServerTransportListenerImpl()));
private final TestServerStreamTracer streamTracer = new TestServerStreamTracer();

private NettyServerStream stream;
private KeepAliveManager spyKeepAliveManager;

final Queue<InputStream> streamListenerMessageQueue = new LinkedList<>();

private int maxConcurrentStreams = Integer.MAX_VALUE;
Expand Down Expand Up @@ -207,6 +200,7 @@ protected void manualSetUp() throws Exception {
// Simulate receipt of initial remote settings.
ByteBuf serializedSettings = serializeSettings(new Http2Settings());
channelRead(serializedSettings);
channel().releaseOutbound();
}

@Test
Expand All @@ -228,10 +222,11 @@ public void sendFrameShouldSucceed() throws Exception {
createStream();

// Send a frame and verify that it was written.
ByteBuf content = content();
ChannelFuture future = enqueue(
new SendGrpcFrameCommand(stream.transportState(), content(), false));
new SendGrpcFrameCommand(stream.transportState(), content, false));
assertTrue(future.isSuccess());
verifyWrite().writeData(eq(ctx()), eq(STREAM_ID), eq(content()), eq(0), eq(false),
verifyWrite().writeData(eq(ctx()), eq(STREAM_ID), same(content), eq(0), eq(false),
any(ChannelPromise.class));
}

Expand Down Expand Up @@ -266,10 +261,11 @@ private void inboundDataShouldForwardToStreamListener(boolean endStream) throws
// Create a data frame and then trigger the handler to read it.
ByteBuf frame = grpcDataFrame(STREAM_ID, endStream, contentAsArray());
channelRead(frame);
channel().releaseOutbound();
verify(streamListener, atLeastOnce())
.messagesAvailable(any(StreamListener.MessageProducer.class));
InputStream message = streamListenerMessageQueue.poll();
assertArrayEquals(ByteBufUtil.getBytes(content()), ByteStreams.toByteArray(message));
assertArrayEquals(contentAsArray(), ByteStreams.toByteArray(message));
message.close();
assertNull("no additional message expected", streamListenerMessageQueue.poll());

Expand Down Expand Up @@ -869,7 +865,7 @@ public void keepAliveEnforcer_sendingDataResetsCounters() throws Exception {
future.get();
for (int i = 0; i < 10; i++) {
future = enqueue(
new SendGrpcFrameCommand(stream.transportState(), content().retainedSlice(), false));
new SendGrpcFrameCommand(stream.transportState(), content(), false));
future.get();
channel().releaseOutbound();
channelRead(pingFrame(false /* isAck */, 1L));
Expand Down Expand Up @@ -1292,6 +1288,7 @@ public void maxRstCount_withinLimit_succeeds() throws Exception {
maxRstPeriodNanos = TimeUnit.MILLISECONDS.toNanos(100);
manualSetUp();
rapidReset(maxRstCount);

assertTrue(channel().isOpen());
}

Expand All @@ -1301,6 +1298,7 @@ public void maxRstCount_exceedsLimit_fails() throws Exception {
maxRstPeriodNanos = TimeUnit.MILLISECONDS.toNanos(100);
manualSetUp();
assertThrows(ClosedChannelException.class, () -> rapidReset(maxRstCount + 1));

assertFalse(channel().isOpen());
}

Expand Down Expand Up @@ -1343,11 +1341,7 @@ private void createStream() throws Exception {

private ByteBuf emptyGrpcFrame(int streamId, boolean endStream) throws Exception {
ByteBuf buf = NettyTestUtil.messageFrame("");
try {
return dataFrame(streamId, endStream, buf);
} finally {
buf.release();
}
return dataFrame(streamId, endStream, buf);
}

@Override
Expand Down