Skip to content

[Bug] ShadowTopicTest: "LEAK: ByteBuf.release() was not called" #25257

@dlg99

Description

@dlg99

Search before reporting

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

User environment

jdk 17, master branch at commit 1f86c24

Issue Description

run a test, e.g.

cd pulsar-broker
mvn test -Dtest=org.apache.pulsar.broker.service.persistent.ShadowTopicTest#testPartitionedShadowTopicProduceAndConsume

look at the log target/surefire-reports/org.apache.pulsar.broker.service.persistent.ShadowTopicTest-output.txt
see

ERROR - [main:ResourceLeakDetector] - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
#1:
        org.apache.bookkeeper.mledger.impl.EntryImpl.deallocate(EntryImpl.java:285)
        org.apache.bookkeeper.mledger.util.AbstractCASReferenceCounted.release0(AbstractCASReferenceCounted.java:102)
        org.apache.bookkeeper.mledger.util.AbstractCASReferenceCounted.release(AbstractCASReferenceCounted.java:85)
        org.apache.pulsar.broker.service.persistent.PersistentReplicator$ProducerSendCallback.sendComplete(PersistentReplicator.java:421)
        org.apache.pulsar.client.impl.ProducerImpl$OpSendMsg.sendComplete(ProducerImpl.java:1702)
        org.apache.pulsar.client.impl.GeoReplicationProducerImpl.removeAndApplyCallback(GeoReplicationProducerImpl.java:252)
        org.apache.pulsar.client.impl.GeoReplicationProducerImpl.ackReceivedReplicatedMsg(GeoReplicationProducerImpl.java:160)
        org.apache.pulsar.client.impl.GeoReplicationProducerImpl.ackReceived(GeoReplicationProducerImpl.java:80)
        org.apache.pulsar.client.impl.ClientCnx.handleSendReceipt(ClientCnx.java:506)
        org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:238)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:361)
        io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:348)
        io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:470)
        io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
        io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)
        io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:171)
        io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:796)
        io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:732)
        io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:658)
        io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
        io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)
        io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        java.base/java.lang.Thread.run(Thread.java:842)
#2:
        org.apache.pulsar.broker.service.persistent.ShadowReplicator.replicateEntries(ShadowReplicator.java:131)
        org.apache.pulsar.broker.service.persistent.PersistentReplicator.readEntriesComplete(PersistentReplicator.java:367)
        org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$complete$1(OpReadEntry.java:261)
        org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:137)
        org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:107)
        io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        java.base/java.lang.Thread.run(Thread.java:842)
Created at:
        Hint: Test: org.apache.pulsar.broker.service.persistent.ShadowTopicTest.testPartitionedShadowTopicProduceAndConsume
        io.netty.buffer.SimpleLeakAwareByteBuf.unwrappedDerived(SimpleLeakAwareByteBuf.java:144)
        io.netty.buffer.SimpleLeakAwareByteBuf.retainedDuplicate(SimpleLeakAwareByteBuf.java:62)
        io.netty.buffer.AdvancedLeakAwareByteBuf.retainedDuplicate(AdvancedLeakAwareByteBuf.java:102)
        org.apache.bookkeeper.mledger.impl.EntryImpl.create(EntryImpl.java:184)
        org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.doAsyncReadEntriesByPosition(RangeEntryCacheImpl.java:404)
        org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.asyncReadEntriesByPosition(RangeEntryCacheImpl.java:308)
        org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.asyncReadEntry0(RangeEntryCacheImpl.java:287)
        org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.asyncReadEntry(RangeEntryCacheImpl.java:269)
        org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncReadEntry(ManagedLedgerImpl.java:2442)
        org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.internalReadFromLedger(ManagedLedgerImpl.java:2412)
        org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.asyncReadEntries(ManagedLedgerImpl.java:2088)
        org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.checkForNewEntries(ManagedCursorImpl.java:1187)
        org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.lambda$asyncReadEntriesWithSkipOrWait$17(ManagedCursorImpl.java:1122)
        org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
        java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:128)
        com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:74)
        com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:80)
        org.apache.bookkeeper.common.util.SingleThreadSafeScheduledExecutorService$SafeRunnable.run(SingleThreadSafeScheduledExecutorService.java:46)
        java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        java.base/java.lang.Thread.run(Thread.java:842)

Error messages

see above

Reproducing the issue

see above

Additional information

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    type/bugThe PR fixed a bug or issue reported a bug

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions