From 7a20aeb897009a3961b44ff175bae6512f7499fe Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Thu, 9 Jul 2020 22:58:12 -0700 Subject: [PATCH] [pulsar-broker] Broker handle backpressure with max-pending message across topics to avoid OOM --- conf/broker.conf | 3 + conf/standalone.conf | 3 + .../pulsar/broker/ServiceConfiguration.java | 7 ++ .../pulsar/broker/service/BrokerService.java | 64 +++++++++++++++++ .../pulsar/broker/service/ServerCnx.java | 23 ++++++- .../auth/MockedPulsarServiceBaseTest.java | 2 +- .../service/BrokerServiceThrottlingTest.java | 69 ++++++++++++++++++- .../client/PulsarMockBookKeeper.java | 17 ++++- 8 files changed, 180 insertions(+), 8 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 05c2747c2201c5..1fe0310e6e35c0 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -305,6 +305,9 @@ maxConcurrentTopicLoadRequest=5000 # Max concurrent non-persistent message can be processed per connection maxConcurrentNonPersistentMessagePerConnection=1000 +# Max number of concurrent pending published messages to control back-pressure across all topics on broker ((0 to disable it) +maxConcurrentPendingPublishMessages=0 + # Number of worker threads to serve non-persistent topic numWorkerThreadsForNonPersistentTopic=8 diff --git a/conf/standalone.conf b/conf/standalone.conf index 2efc6511b0ae5f..2ead456a7f95cc 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -208,6 +208,9 @@ maxConcurrentTopicLoadRequest=5000 # Max concurrent non-persistent message can be processed per connection maxConcurrentNonPersistentMessagePerConnection=1000 +# Max number of concurrent pending published messages to control back-pressure across all topics on broker ((0 to disable it) +maxConcurrentPendingPublishMessages=0 + # Number of worker threads to serve non-persistent topic numWorkerThreadsForNonPersistentTopic=8 diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index eb2d46a04c555d..32f2267801a808 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -601,6 +601,13 @@ public class ServiceConfiguration implements PulsarConfiguration { category = CATEGORY_SERVER, doc = "Max concurrent non-persistent message can be processed per connection") private int maxConcurrentNonPersistentMessagePerConnection = 1000; + @FieldContext( + dynamic = true, + category = CATEGORY_SERVER, + doc = "Max number of concurrent pending published messages to control backpressure across " + + "all topics on broker ((0 to disable it)" + ) + private int maxConcurrentPendingPublishMessages = 0; @FieldContext( category = CATEGORY_SERVER, doc = "Number of worker threads to serve non-persistent topic") diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 34cca9046dd86b..d9bbd96c933066 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -229,6 +229,13 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener blockedDispatchers; + + // manage max pending messages across all topics in broker + private final LongAdder brokerPendingMessages; + private final ConcurrentOpenHashSet serverConnections; + private volatile boolean isPublishPaused = false; + private volatile ScheduledFuture publishResumeTask = null; + private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory; @@ -303,6 +310,11 @@ public Map deserialize(String key, byte[] content) throws Except } }; this.blockedDispatchers = new ConcurrentOpenHashSet<>(); + this.serverConnections = new ConcurrentOpenHashSet<>(); + this.brokerPendingMessages = new LongAdder(); + if (pulsar.getConfiguration().getMaxConcurrentPendingPublishMessages() > 0) { + pulsar.getExecutor().scheduleAtFixedRate(() -> checkPendingPublishMessages(), 30, 5, TimeUnit.SECONDS); + } // update dynamic configuration and register-listener updateConfigurationAndRegisterListeners(); this.lookupRequestSemaphore = new AtomicReference( @@ -2307,4 +2319,56 @@ private boolean isSystemTopic(String topic) { public void setInterceptor(BrokerInterceptor interceptor) { this.interceptor = interceptor; } + + public ConcurrentOpenHashSet getServerConnections() { + return serverConnections; + } + + public void checkPendingPublishMessages() { + if (isPublishPaused) { + checkPublishMessageResume(); + } + long currentPendingMessages = brokerPendingMessages.sum(); + if (currentPendingMessages > pulsar().getConfiguration().getMaxConcurrentPendingPublishMessages()) { + // pause the reads + if (log.isDebugEnabled()) { + log.debug("Disabling auto-read as maxConcurrentPendingPublishMessages exceeded {}", + pulsar().getConfiguration().getMaxConcurrentPendingPublishMessages()); + } + serverConnections.forEach(cnx -> { + cnx.setAutoRead(false); + }); + isPublishPaused = true; + // schedule a timer to check again and resume reads if pending-message limit decreased + checkPublishMessageResume(); + } + } + + private void checkPublishMessageResume() { + if (publishResumeTask != null) { + return; + } + publishResumeTask = pulsar.getExecutor().schedule(() -> { + long pendingMessages = brokerPendingMessages.sum(); + if (pendingMessages < pulsar().getConfiguration().getMaxConcurrentPendingPublishMessages()) { + // resume the reads + if (log.isDebugEnabled()) { + log.debug("Enabling auto-read after cleaing max-pending messages"); + } + serverConnections.forEach(cnx -> { + cnx.setAutoRead(true); + }); + isPublishPaused = false; + publishResumeTask = null; + } else { + publishResumeTask = null; + // else check again after a delay + checkPublishMessageResume(); + } + }, 1, TimeUnit.SECONDS); + } + + public LongAdder getBrokerPendingMessages() { + return brokerPendingMessages; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 3131a7649cce4d..b7a6f70f62bab9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -43,6 +43,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.LongAdder; import java.util.stream.Collectors; import javax.naming.AuthenticationException; @@ -149,6 +150,8 @@ public class ServerCnx extends PulsarHandler { private final int maxPendingSendRequests; private final int resumeReadsThreshold; private int pendingSendRequest = 0; + private LongAdder brokerPendingMessages; + private int maxConcurrentBrokerPendingPublishMessage; private final String replicatorPrefix; private String clientVersion = null; private int nonPersistentPendingMessages = 0; @@ -196,6 +199,11 @@ public ServerCnx(PulsarService pulsar) { this.resumeReadsThreshold = maxPendingSendRequests / 2; this.preciseDispatcherFlowControl = pulsar.getConfiguration().isPreciseDispatcherFlowControl(); this.preciseTopicPublishRateLimitingEnable = pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable(); + this.maxConcurrentBrokerPendingPublishMessage = pulsar.getConfiguration().getMaxConcurrentPendingPublishMessages(); + if (maxConcurrentBrokerPendingPublishMessage > 0) { + service.getServerConnections().add(this); + } + brokerPendingMessages = service.getBrokerPendingMessages(); } @Override @@ -1774,6 +1782,7 @@ public void closeConsumer(Consumer consumer) { * consumers from connection-map */ protected void close() { + service.getServerConnections().remove(this); ctx.close(); } @@ -1822,7 +1831,7 @@ public void startSendOperation(Producer producer, int msgSize, int numMessages) if (++pendingSendRequest == maxPendingSendRequests || isPublishRateExceeded) { // When the quota of pending send requests is reached, stop reading from socket to cause backpressure on // client connection, possibly shared between multiple producers - ctx.channel().config().setAutoRead(false); + setAutoRead(false); autoReadDisabledRateLimiting = isPublishRateExceeded; } @@ -1830,19 +1839,29 @@ public void startSendOperation(Producer producer, int msgSize, int numMessages) ctx.channel().config().setAutoRead(false); autoReadDisabledPublishBufferLimiting = true; } + if (maxConcurrentBrokerPendingPublishMessage > 0) { + brokerPendingMessages.increment(); + } } public void completedSendOperation(boolean isNonPersistentTopic, int msgSize) { MSG_PUBLISH_BUFFER_SIZE_UPDATER.getAndAdd(this, -msgSize); if (--pendingSendRequest == resumeReadsThreshold) { // Resume reading from socket - ctx.channel().config().setAutoRead(true); + setAutoRead(true); // triggers channel read if autoRead couldn't trigger it ctx.read(); } if (isNonPersistentTopic) { nonPersistentPendingMessages--; } + if (maxConcurrentBrokerPendingPublishMessage > 0) { + brokerPendingMessages.decrement(); + } + } + + public void setAutoRead(boolean autoRead) { + ctx.channel().config().setAutoRead(autoRead); } public void enableCnxAutoRead() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index f808b96de0114c..394cd9c0a0c6cf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -192,7 +192,7 @@ protected final void internalCleanup() throws Exception { admin = null; } if (pulsarClient != null) { - pulsarClient.close(); + pulsarClient.shutdown(); pulsarClient = null; } if (pulsar != null) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java index 211b5278e0b42b..4c2b130c0bb4c3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java @@ -23,9 +23,7 @@ import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.fail; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - +import java.security.GeneralSecurityException; import java.util.Collections; import java.util.List; import java.util.Map; @@ -35,7 +33,12 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.client.AsyncCallback.AddCallback; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.PulsarMockBookKeeper; +import org.apache.bookkeeper.client.PulsarMockLedgerHandle; import org.apache.bookkeeper.util.ZkUtils; +import org.apache.pulsar.broker.service.BrokerServiceThrottlingTest.NoOpPulsarMockLedgerHandle; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -48,6 +51,11 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import io.netty.buffer.ByteBuf; + /** */ public class BrokerServiceThrottlingTest extends BrokerTestBase { @@ -258,4 +266,59 @@ private void upsertLookupPermits(int permits) throws Exception { ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } + + @Test + public void testBrokerMaxPendingMessages() throws Exception { + int maxConcurrentPendingPublishMessages = 10; + int totalTopics = 100; + conf.setMaxConcurrentPendingPublishMessages(maxConcurrentPendingPublishMessages); + restartBroker(); + final String topicName = "persistent://prop/ns-abc/maxPendingMsg"; + + org.apache.pulsar.client.api.Producer[] producers = new org.apache.pulsar.client.api.Producer[totalTopics]; + + // shutdown bookie so, broker will fail to publish + ((NonClosableMockBookKeeper) mockBookKeeper) + .setLedgerHandler((PulsarMockBookKeeper bk, long id, DigestType digest, byte[] passwd) -> { + try { + return new NoOpPulsarMockLedgerHandle(bk, id, digest, "test".getBytes()); + } catch (GeneralSecurityException e) { + e.printStackTrace(); + return null; + } + }); + + for (int i = 0; i < totalTopics; i++) { + producers[i] = pulsarClient.newProducer().sendTimeout(2, TimeUnit.MINUTES).topic(topicName + i).create(); + // producers[i].send("open-ledger".getBytes()); + } + + // publish maxConcurrentPendingPublishMessages + for (int i = 0; i < maxConcurrentPendingPublishMessages + 1; i++) { + producers[i].sendAsync("test".getBytes()); + } + Thread.sleep(2000); + pulsar.getBrokerService().checkPendingPublishMessages(); + + for (int i = maxConcurrentPendingPublishMessages + 1; i < totalTopics; i++) { + producers[i].sendAsync("test".getBytes()); + } + Thread.sleep(2000); + long totalPendingMessages = pulsar.getBrokerService().getBrokerPendingMessages().sum(); + assertNotEquals(totalPendingMessages, totalTopics); + ((NonClosableMockBookKeeper) mockBookKeeper).reallyShutdown(); + } + + static class NoOpPulsarMockLedgerHandle extends PulsarMockLedgerHandle { + + public NoOpPulsarMockLedgerHandle(PulsarMockBookKeeper bk, long id, DigestType digest, byte[] passwd) + throws GeneralSecurityException { + super(bk, id, digest, passwd); + } + + @Override + public void asyncAddEntry(final ByteBuf data, final AddCallback cb, final Object ctx) { + // No-op + } + } } diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java index 291544e568603d..3c8f51bce53b85 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java @@ -52,6 +52,10 @@ public class PulsarMockBookKeeper extends BookKeeper { final ExecutorService executor; final ZooKeeper zkc; + PulsarLedgerHandleFactory ledgerHandler; + public static interface PulsarLedgerHandleFactory { + PulsarMockLedgerHandle create(PulsarMockBookKeeper bk, long id, DigestType digest, byte[] passwd); + } @Override public ZooKeeper getZkHandle() { @@ -93,7 +97,8 @@ public void asyncCreateLedger(int ensSize, int writeQuorumSize, int ackQuorumSiz try { long id = sequence.getAndIncrement(); log.info("Creating ledger {}", id); - PulsarMockLedgerHandle lh = new PulsarMockLedgerHandle(PulsarMockBookKeeper.this, id, digestType, passwd); + PulsarMockLedgerHandle lh = ledgerHandler != null ? ledgerHandler.create(this, id, digestType, passwd) + : new PulsarMockLedgerHandle(PulsarMockBookKeeper.this, id, digestType, passwd); ledgers.put(id, lh); return FutureUtils.value(lh); } catch (Throwable t) { @@ -116,7 +121,8 @@ public LedgerHandle createLedger(int ensSize, int writeQuorumSize, int ackQuorum try { long id = sequence.getAndIncrement(); log.info("Creating ledger {}", id); - PulsarMockLedgerHandle lh = new PulsarMockLedgerHandle(this, id, digestType, passwd); + PulsarMockLedgerHandle lh = ledgerHandler != null ? ledgerHandler.create(this, id, digestType, passwd) + : new PulsarMockLedgerHandle(this, id, digestType, passwd); ledgers.put(id, lh); return lh; } catch (Throwable t) { @@ -323,5 +329,12 @@ static int getExceptionCode(Throwable t) { } } + public PulsarLedgerHandleFactory getLedgerHandler() { + return ledgerHandler; + } + + public void setLedgerHandler(PulsarLedgerHandleFactory ledgerHandler) { + this.ledgerHandler = ledgerHandler; + } private static final Logger log = LoggerFactory.getLogger(PulsarMockBookKeeper.class); }