[Broker] Support configuration of the memory allocated to replication producer #15691
Closed
Description
Describe the bug
When replicating messages at high rates, brokers on the cluster originating message georeplication crash with the following error:
2022-05-19T18:35:09,916+0000 [BookKeeperClientWorker-OrderedExecutor-3-0] ERROR org.apache.pulsar.broker.service.persistent.PersistentReplicator - [persistent://tenant/namespace/perftest-partition-33][pulsar-cluster-src -> pulsar-cluster-dst] Error producing on remote broker
org.apache.pulsar.client.api.PulsarClientException$MemoryBufferIsFullError: Client memory buffer is full
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:849) ~[com.datastax.oss-pulsar-client-original-2.10.0.3.jar:2.10.0.3]
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:429) ~[com.datastax.oss-pulsar-client-original-2.10.0.3.jar:2.10.0.3]
at org.apache.pulsar.broker.service.persistent.PersistentReplicator.lambda$readEntriesComplete$2(PersistentReplicator.java:369) ~[com.datastax.oss-pulsar-broker-2.10.0.3.jar:2.10.0.3]
at java.util.concurrent.CompletableFuture.uniAcceptNow(CompletableFuture.java:753) ~[?:?]
at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:731) ~[?:?]
at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2108) ~[?:?]
at org.apache.pulsar.broker.service.persistent.PersistentReplicator.readEntriesComplete(PersistentReplicator.java:367) ~[com.datastax.oss-pulsar-broker-2.10.0.3.jar:2.10.0.3]
at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$checkReadCompletion$2(OpReadEntry.java:156) ~[com.datastax.oss-managed-ledger-2.10.0.3.jar:2.10.0.3]
at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [com.datastax.oss-managed-ledger-2.10.0.3.jar:2.10.0.3]
at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [com.datastax.oss-bookkeeper-common-4.14.5.1.0.0.jar:4.14.5.1.0.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.76.Final.jar:4.1.76.Final]
at java.lang.Thread.run(Thread.java:829) [?:?]
Currently, only the broker configuration replicationProducerQueueSize
is in place to increase the replicating production rate. It would be convenient to have a configuration key setting the actual memory limit - what about replicationProducerMemoryLimit
?.
To Reproduce
Steps to reproduce the behavior:
- setup pulsar georeplication
- start a
pulsar-perf
producer oncluster-src
to produce tocluster-dst
(with high rate, at least 15K msg / s) - observe the reported errors showing up on the
cluster-src
's brokers.
Desktop (please complete the following information):
- Pulsar helm deployment on GKE cluster
Additional context
This is likely a consequence of the changes introduced by #13344.