diff --git a/buffer/src/main/java/io/netty/buffer/ByteBuf.java b/buffer/src/main/java/io/netty/buffer/ByteBuf.java index cfef2c4b0b85..76a68e881e39 100644 --- a/buffer/src/main/java/io/netty/buffer/ByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/ByteBuf.java @@ -1808,6 +1808,14 @@ interface Unsafe { */ ByteBuf newBuffer(int initialCapacity); + /** + * Similar to {@link ByteBuf#discardReadBytes()} except that this method might discard + * some, all, or none of read bytes depending on its internal implementation to reduce + * overall memory bandwidth consumption at the cost of potentially additional memory + * consumption. + */ + void discardSomeReadBytes(); + /** * Increases the reference count of the buffer. */ diff --git a/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java b/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java index 6e71b9d1cafb..59bc6bad3d4c 100644 --- a/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/DefaultCompositeByteBuf.java @@ -1201,6 +1201,11 @@ public ByteBuf newBuffer(int initialCapacity) { return buf; } + @Override + public void discardSomeReadBytes() { + discardReadComponents(); + } + @Override public void acquire() { if (refCnt <= 0) { diff --git a/buffer/src/main/java/io/netty/buffer/DirectByteBuf.java b/buffer/src/main/java/io/netty/buffer/DirectByteBuf.java index e1c95330d87b..07f9802b0315 100644 --- a/buffer/src/main/java/io/netty/buffer/DirectByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/DirectByteBuf.java @@ -413,6 +413,19 @@ public ByteBuf newBuffer(int initialCapacity) { return new DirectByteBuf(initialCapacity, Math.max(initialCapacity, maxCapacity())); } + @Override + public void discardSomeReadBytes() { + final int readerIndex = readerIndex(); + if (readerIndex == writerIndex()) { + discardReadBytes(); + return; + } + + if (readerIndex > 0 && readerIndex >= capacity >>> 1) { + discardReadBytes(); + } + } + @Override public void acquire() { if (refCnt <= 0) { diff --git a/buffer/src/main/java/io/netty/buffer/DuplicatedByteBuf.java b/buffer/src/main/java/io/netty/buffer/DuplicatedByteBuf.java index 1dd6235f0645..7cf17084d00f 100644 --- a/buffer/src/main/java/io/netty/buffer/DuplicatedByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/DuplicatedByteBuf.java @@ -228,6 +228,11 @@ public ByteBuf newBuffer(int initialCapacity) { return buffer.unsafe().newBuffer(initialCapacity); } + @Override + public void discardSomeReadBytes() { + throw new UnsupportedOperationException(); + } + @Override public void acquire() { buffer.unsafe().acquire(); diff --git a/buffer/src/main/java/io/netty/buffer/HeapByteBuf.java b/buffer/src/main/java/io/netty/buffer/HeapByteBuf.java index 8f9ef3eadf1f..5b19e2185c85 100644 --- a/buffer/src/main/java/io/netty/buffer/HeapByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/HeapByteBuf.java @@ -302,6 +302,19 @@ public ByteBuf newBuffer(int initialCapacity) { return new HeapByteBuf(initialCapacity, Math.max(initialCapacity, maxCapacity())); } + @Override + public void discardSomeReadBytes() { + final int readerIndex = readerIndex(); + if (readerIndex == writerIndex()) { + discardReadBytes(); + return; + } + + if (readerIndex > 0 && readerIndex >= capacity() >>> 1) { + discardReadBytes(); + } + } + @Override public void acquire() { if (refCnt <= 0) { diff --git a/buffer/src/main/java/io/netty/buffer/SlicedByteBuf.java b/buffer/src/main/java/io/netty/buffer/SlicedByteBuf.java index 9a866d951555..4a11e5356697 100644 --- a/buffer/src/main/java/io/netty/buffer/SlicedByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/SlicedByteBuf.java @@ -295,6 +295,11 @@ public ByteBuf newBuffer(int initialCapacity) { return buffer.unsafe().newBuffer(initialCapacity); } + @Override + public void discardSomeReadBytes() { + throw new UnsupportedOperationException(); + } + @Override public void acquire() { buffer.unsafe().acquire(); diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToByteDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToByteDecoder.java index 08deb385f590..14a956bc5614 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToByteDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToByteDecoder.java @@ -46,7 +46,6 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { } if (out.readableBytes() > oldOutSize) { - in.discardReadBytes(); ctx.fireInboundBufferUpdated(); } @@ -71,8 +70,8 @@ private void callDecode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) { } } + in.unsafe().discardSomeReadBytes(); if (out.readableBytes() > oldOutSize) { - in.discardReadBytes(); ctx.fireInboundBufferUpdated(); } } diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToByteEncoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToByteEncoder.java index 849928454ce5..55ef2ac98f86 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToByteEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToByteEncoder.java @@ -44,10 +44,7 @@ public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Except } } - if (out.readableBytes() > oldOutSize) { - in.discardReadBytes(); - } - + in.unsafe().discardSomeReadBytes(); ctx.flush(future); } diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java index c0431e3f26e5..ef802be9848b 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java @@ -52,7 +52,6 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { try { if (CodecUtil.unfoldAndAdd(ctx, decodeLast(ctx, in), true)) { - in.discardReadBytes(); ctx.fireInboundBufferUpdated(); } } catch (Throwable t) { @@ -93,9 +92,10 @@ protected void callDecode(ChannelHandlerContext ctx) { break; } } catch (Throwable t) { + in.unsafe().discardSomeReadBytes(); + if (decoded) { decoded = false; - in.discardReadBytes(); ctx.fireInboundBufferUpdated(); } @@ -107,8 +107,9 @@ protected void callDecode(ChannelHandlerContext ctx) { } } + in.unsafe().discardSomeReadBytes(); + if (decoded) { - in.discardReadBytes(); ctx.fireInboundBufferUpdated(); } } diff --git a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java index ff497ef5d91a..d0bd9dcd5fdb 100644 --- a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java @@ -455,8 +455,10 @@ protected void callDecode(ChannelHandlerContext ctx) { } private void fireInboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) { - checkpoint -= in.readerIndex(); - in.discardReadBytes(); + final int oldReaderIndex = in.readerIndex(); + in.unsafe().discardSomeReadBytes(); + final int newReaderIndex = in.readerIndex(); + checkpoint -= oldReaderIndex - newReaderIndex; ctx.fireInboundBufferUpdated(); } } diff --git a/codec/src/main/java/io/netty/handler/codec/serialization/CompatibleObjectEncoder.java b/codec/src/main/java/io/netty/handler/codec/serialization/CompatibleObjectEncoder.java index a01ad02ac3e8..519a135d5991 100644 --- a/codec/src/main/java/io/netty/handler/codec/serialization/CompatibleObjectEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/serialization/CompatibleObjectEncoder.java @@ -100,7 +100,7 @@ public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Ex oos.reset(); // Also discard the byproduct to avoid OOM on the sending side. - out.discardReadBytes(); + out.unsafe().discardSomeReadBytes(); } } diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index 70d9e8e45bbc..dcbe670ced24 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -336,7 +336,7 @@ public void flush(final ChannelHandlerContext ctx, ChannelFuture future) throws final ByteBuf in = ctx.outboundByteBuffer(); final ByteBuf out = ctx.nextOutboundByteBuffer(); - out.discardReadBytes(); + out.unsafe().discardSomeReadBytes(); // Do not encrypt the first write request if this handler is // created with startTLS flag turned on. @@ -398,9 +398,7 @@ public void flush(final ChannelHandlerContext ctx, ChannelFuture future) throws setHandshakeFailure(e); throw e; } finally { - if (bytesProduced > 0) { - in.discardReadBytes(); - } + in.unsafe().discardSomeReadBytes(); ctx.flush(future); } } diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java index 012e9e8a2c8a..4423bfd6c637 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java @@ -441,7 +441,7 @@ public void flush(ChannelHandlerContext ctx, out.add(msg); } } - in.discardReadBytes(); + in.unsafe().discardSomeReadBytes(); if (swallow) { future.setSuccess(); } else {